Skip to content

Commit

Permalink
Merge pull request #268 from rramkumar1/add-backend-config-event-hand…
Browse files Browse the repository at this point in the history
…lers

Add event handlers for BackendConfig
  • Loading branch information
nicksardo authored May 15, 2018
2 parents 285e110 + 0298e21 commit aa2a191
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 25 deletions.
11 changes: 9 additions & 2 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

crdclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/client-go/kubernetes"
backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"

"k8s.io/ingress-gce/pkg/context"
Expand Down Expand Up @@ -72,6 +73,7 @@ func main() {
glog.Fatalf("Failed to create kubernetes client: %v", err)
}

var backendConfigClient backendconfigclient.Interface
if flags.F.EnableBackendConfig {
crdClient, err := crdclient.NewForConfig(kubeConfig)
if err != nil {
Expand All @@ -81,6 +83,11 @@ func main() {
if _, err := backendconfig.EnsureCRD(crdClient); err != nil {
glog.Fatalf("Failed to ensure BackendConfig CRD: %v", err)
}

backendConfigClient, err = backendconfigclient.NewForConfig(kubeConfig)
if err != nil {
glog.Fatalf("Failed to create BackendConfig client: %v", err)
}
}

namer, err := app.NewNamer(kubeClient, flags.F.ClusterName, controller.DefaultFirewallName)
Expand All @@ -97,8 +104,8 @@ func main() {

enableNEG := cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureNetworkEndpointGroup)
stopCh := make(chan struct{})
ctx := context.NewControllerContext(kubeClient, flags.F.WatchNamespace, flags.F.ResyncPeriod, enableNEG)
lbc, err := controller.NewLoadBalancerController(kubeClient, stopCh, ctx, clusterManager, enableNEG)
ctx := context.NewControllerContext(kubeClient, backendConfigClient, flags.F.WatchNamespace, flags.F.ResyncPeriod, enableNEG)
lbc, err := controller.NewLoadBalancerController(kubeClient, stopCh, ctx, clusterManager, enableNEG, flags.F.EnableBackendConfig)
if err != nil {
glog.Fatalf("Error creating load balancer controller: %v", err)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/backendconfig/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type BackendConfig struct {

// BackendConfigSpec is the spec for a BackendConfig resource
type BackendConfigSpec struct {
Iap *IAPConfig
Cdn *CDNConfig
}

// BackendConfigStatus is the status for a BackendConfig resource
Expand Down
30 changes: 24 additions & 6 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,33 @@ import (
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned"
informerbackendconfig "k8s.io/ingress-gce/pkg/backendconfig/client/informers/externalversions/backendconfig/v1alpha1"
)

// ControllerContext holds
type ControllerContext struct {
kubeClient kubernetes.Interface

IngressInformer cache.SharedIndexInformer
ServiceInformer cache.SharedIndexInformer
PodInformer cache.SharedIndexInformer
NodeInformer cache.SharedIndexInformer
EndpointInformer cache.SharedIndexInformer
IngressInformer cache.SharedIndexInformer
ServiceInformer cache.SharedIndexInformer
BackendConfigInformer cache.SharedIndexInformer
PodInformer cache.SharedIndexInformer
NodeInformer cache.SharedIndexInformer
EndpointInformer cache.SharedIndexInformer

// Map of namespace => record.EventRecorder.
recorders map[string]record.EventRecorder
}

// NewControllerContext returns a new shared set of informers.
func NewControllerContext(kubeClient kubernetes.Interface, namespace string, resyncPeriod time.Duration, enableEndpointsInformer bool) *ControllerContext {
func NewControllerContext(
kubeClient kubernetes.Interface,
backendConfigClient backendconfigclient.Interface,
namespace string,
resyncPeriod time.Duration,
enableEndpointsInformer bool) *ControllerContext {

newIndexer := func() cache.Indexers {
return cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}
}
Expand All @@ -61,6 +70,9 @@ func NewControllerContext(kubeClient kubernetes.Interface, namespace string, res
if enableEndpointsInformer {
context.EndpointInformer = informerv1.NewEndpointsInformer(kubeClient, namespace, resyncPeriod, newIndexer())
}
if backendConfigClient != nil {
context.BackendConfigInformer = informerbackendconfig.NewBackendConfigInformer(backendConfigClient, namespace, resyncPeriod, newIndexer())
}

return context
}
Expand All @@ -76,6 +88,9 @@ func (ctx *ControllerContext) HasSynced() bool {
if ctx.EndpointInformer != nil {
funcs = append(funcs, ctx.EndpointInformer.HasSynced)
}
if ctx.BackendConfigInformer != nil {
funcs = append(funcs, ctx.BackendConfigInformer.HasSynced)
}
for _, f := range funcs {
if !f() {
return false
Expand Down Expand Up @@ -109,4 +124,7 @@ func (ctx *ControllerContext) Start(stopCh chan struct{}) {
if ctx.EndpointInformer != nil {
go ctx.EndpointInformer.Run(stopCh)
}
if ctx.BackendConfigInformer != nil {
go ctx.BackendConfigInformer.Run(stopCh)
}
}
70 changes: 59 additions & 11 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
backendconfigv1alpha1 "k8s.io/ingress-gce/pkg/apis/backendconfig/v1alpha1"
backendconfig "k8s.io/ingress-gce/pkg/backendconfig"

"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/context"
Expand Down Expand Up @@ -61,11 +63,11 @@ type LoadBalancerController struct {
client kubernetes.Interface
ctx *context.ControllerContext

ingLister StoreToIngressLister
nodeLister cache.Indexer
nodes *NodeController
ingLister StoreToIngressLister
// endpoint lister is needed when translating service target port to real endpoint target ports.
endpointLister StoreToEndpointLister
nodeLister cache.Indexer
nodes *NodeController

// TODO: Watch secrets
CloudClusterManager *ClusterManager
Expand All @@ -91,7 +93,14 @@ type LoadBalancerController struct {
// - clusterManager: A ClusterManager capable of creating all cloud resources
// required for L7 loadbalancing.
// - resyncPeriod: Watchers relist from the Kubernetes API server this often.
func NewLoadBalancerController(kubeClient kubernetes.Interface, stopCh chan struct{}, ctx *context.ControllerContext, clusterManager *ClusterManager, negEnabled bool) (*LoadBalancerController, error) {
func NewLoadBalancerController(
kubeClient kubernetes.Interface,
stopCh chan struct{},
ctx *context.ControllerContext,
clusterManager *ClusterManager,
negEnabled bool,
backendConfigEnabled bool) (*LoadBalancerController, error) {

broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(glog.Infof)
broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{
Expand All @@ -114,7 +123,7 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, stopCh chan stru
lbc.endpointLister.Indexer = ctx.EndpointInformer.GetIndexer()
}

// ingress event handler
// Ingress event handlers.
ctx.IngressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addIng := obj.(*extensions.Ingress)
Expand Down Expand Up @@ -152,17 +161,30 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, stopCh chan stru
},
})

// service event handler
// Service event handlers.
ctx.ServiceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: lbc.enqueueIngressForService,
AddFunc: lbc.enqueueIngressForObject,
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
lbc.enqueueIngressForService(cur)
lbc.enqueueIngressForObject(cur)
}
},
// Ingress deletes matter, service deletes don't.
})

// BackendConfig event handlers.
if backendConfigEnabled {
ctx.BackendConfigInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: lbc.enqueueIngressForObject,
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
lbc.enqueueIngressForObject(cur)
}
},
DeleteFunc: lbc.enqueueIngressForObject,
})
}

var endpointIndexer cache.Indexer
if ctx.EndpointInformer != nil {
endpointIndexer = ctx.EndpointInformer.GetIndexer()
Expand All @@ -180,9 +202,24 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, stopCh chan stru
return &lbc, nil
}

// enqueueIngressForService enqueues all the Ingress' for a Service.
func (lbc *LoadBalancerController) enqueueIngressForService(obj interface{}) {
svc := obj.(*apiv1.Service)
// enqueueIngressForObject enqueues Ingresses that are associated with the
// passed in object. It is a wrapper around functions which do the actual
// work of enqueueing Ingresses based on a typed object.
func (lbc *LoadBalancerController) enqueueIngressForObject(obj interface{}) {
switch obj.(type) {
case *apiv1.Service:
svc := obj.(*apiv1.Service)
lbc.enqueueIngressForService(svc)
case *backendconfigv1alpha1.BackendConfig:
beConfig := obj.(*backendconfigv1alpha1.BackendConfig)
lbc.enqueueIngressForBackendConfig(beConfig)
default:
// Do nothing.
}
}

// enqueueIngressForService enqueues all the Ingresses for a Service.
func (lbc *LoadBalancerController) enqueueIngressForService(svc *apiv1.Service) {
ings, err := lbc.ingLister.GetServiceIngress(svc)
if err != nil {
glog.V(5).Infof("ignoring service %v: %v", svc.Name, err)
Expand All @@ -196,6 +233,17 @@ func (lbc *LoadBalancerController) enqueueIngressForService(obj interface{}) {
}
}

// enqueueIngressForBackendConfig enqueues all Ingresses for a BackendConfig.
func (lbc *LoadBalancerController) enqueueIngressForBackendConfig(beConfig *backendconfigv1alpha1.BackendConfig) {
// Get all the Services associated with this BackendConfig.
svcLister := lbc.ctx.ServiceInformer.GetIndexer()
linkedSvcs := backendconfig.GetServicesForBackendConfig(svcLister, beConfig)
// Enqueue all the Ingresses associated with each Service.
for _, svc := range linkedSvcs {
lbc.enqueueIngressForService(svc)
}
}

// Run starts the loadbalancer controller.
func (lbc *LoadBalancerController) Run() {
glog.Infof("Starting loadbalancer controller")
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/kubernetes/fake"
backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned/fake"

"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/context"
Expand All @@ -53,9 +54,10 @@ func defaultBackendName(clusterName string) string {
// newLoadBalancerController create a loadbalancer controller.
func newLoadBalancerController(t *testing.T, cm *fakeClusterManager) *LoadBalancerController {
kubeClient := fake.NewSimpleClientset()
backendConfigClient := backendconfigclient.NewSimpleClientset()
stopCh := make(chan struct{})
ctx := context.NewControllerContext(kubeClient, api_v1.NamespaceAll, 1*time.Second, true)
lb, err := NewLoadBalancerController(kubeClient, stopCh, ctx, cm.ClusterManager, true)
ctx := context.NewControllerContext(kubeClient, backendConfigClient, api_v1.NamespaceAll, 1*time.Second, true)
lb, err := NewLoadBalancerController(kubeClient, stopCh, ctx, cm.ClusterManager, true, true)
if err != nil {
t.Fatalf("%v", err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/translator/translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/client-go/kubernetes/fake"
unversionedcore "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned/fake"

"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/context"
Expand All @@ -43,6 +44,7 @@ var (

func fakeTranslator(negEnabled bool) *Translator {
client := fake.NewSimpleClientset()
backendConfigClient := backendconfigclient.NewSimpleClientset()
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(glog.Infof)
broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{
Expand All @@ -51,7 +53,7 @@ func fakeTranslator(negEnabled bool) *Translator {

namer := utils.NewNamer("uid1", "fw1")

ctx := context.NewControllerContext(client, apiv1.NamespaceAll, 1*time.Second, negEnabled)
ctx := context.NewControllerContext(client, backendConfigClient, apiv1.NamespaceAll, 1*time.Second, negEnabled)
gce := &Translator{
recorders: ctx,
namer: namer,
Expand Down
4 changes: 3 additions & 1 deletion pkg/neg/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/ingress-gce/pkg/annotations"
backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned/fake"
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/utils"

"k8s.io/apimachinery/pkg/util/intstr"
)

func newTestController(kubeClient kubernetes.Interface) *Controller {
context := context.NewControllerContext(kubeClient, apiv1.NamespaceAll, 1*time.Second, true)
backendConfigClient := backendconfigclient.NewSimpleClientset()
context := context.NewControllerContext(kubeClient, backendConfigClient, apiv1.NamespaceAll, 1*time.Second, true)
controller, _ := NewController(kubeClient,
NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network"),
context,
Expand Down
4 changes: 3 additions & 1 deletion pkg/neg/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned/fake"
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/utils"
)
Expand All @@ -36,7 +37,8 @@ const (
)

func NewTestSyncerManager(kubeClient kubernetes.Interface) *syncerManager {
context := context.NewControllerContext(kubeClient, apiv1.NamespaceAll, 1*time.Second, true)
backendConfigClient := backendconfigclient.NewSimpleClientset()
context := context.NewControllerContext(kubeClient, backendConfigClient, apiv1.NamespaceAll, 1*time.Second, true)
manager := newSyncerManager(
utils.NewNamer(CluseterID, ""),
record.NewFakeRecorder(100),
Expand Down
4 changes: 3 additions & 1 deletion pkg/neg/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned/fake"
"k8s.io/ingress-gce/pkg/context"
)

Expand All @@ -23,7 +24,8 @@ const (

func NewTestSyncer() *syncer {
kubeClient := fake.NewSimpleClientset()
context := context.NewControllerContext(kubeClient, apiv1.NamespaceAll, 1*time.Second, true)
backendConfigClient := backendconfigclient.NewSimpleClientset()
context := context.NewControllerContext(kubeClient, backendConfigClient, apiv1.NamespaceAll, 1*time.Second, true)
svcPort := servicePort{
namespace: testServiceNamespace,
name: testServiceName,
Expand Down

0 comments on commit aa2a191

Please sign in to comment.