Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add event handlers for BackendConfig #268

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

crdclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/client-go/kubernetes"
backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"

"k8s.io/ingress-gce/pkg/context"
Expand Down Expand Up @@ -72,6 +73,7 @@ func main() {
glog.Fatalf("Failed to create kubernetes client: %v", err)
}

var backendConfigClient backendconfigclient.Interface
if flags.F.EnableBackendConfig {
crdClient, err := crdclient.NewForConfig(kubeConfig)
if err != nil {
Expand All @@ -81,6 +83,11 @@ func main() {
if _, err := backendconfig.EnsureCRD(crdClient); err != nil {
glog.Fatalf("Failed to ensure BackendConfig CRD: %v", err)
}

backendConfigClient, err = backendconfigclient.NewForConfig(kubeConfig)
if err != nil {
glog.Fatalf("Failed to create BackendConfig client: %v", err)
}
}

namer, err := app.NewNamer(kubeClient, flags.F.ClusterName, controller.DefaultFirewallName)
Expand All @@ -97,8 +104,8 @@ func main() {

enableNEG := cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureNetworkEndpointGroup)
stopCh := make(chan struct{})
ctx := context.NewControllerContext(kubeClient, flags.F.WatchNamespace, flags.F.ResyncPeriod, enableNEG)
lbc, err := controller.NewLoadBalancerController(kubeClient, stopCh, ctx, clusterManager, enableNEG)
ctx := context.NewControllerContext(kubeClient, backendConfigClient, flags.F.WatchNamespace, flags.F.ResyncPeriod, enableNEG)
lbc, err := controller.NewLoadBalancerController(kubeClient, stopCh, ctx, clusterManager, enableNEG, flags.F.EnableBackendConfig)
if err != nil {
glog.Fatalf("Error creating load balancer controller: %v", err)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/backendconfig/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type BackendConfig struct {

// BackendConfigSpec is the spec for a BackendConfig resource
type BackendConfigSpec struct {
Iap *IAPConfig
Cdn *CDNConfig
}

// BackendConfigStatus is the status for a BackendConfig resource
Expand Down
30 changes: 24 additions & 6 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,33 @@ import (
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned"
informerbackendconfig "k8s.io/ingress-gce/pkg/backendconfig/client/informers/externalversions/backendconfig/v1alpha1"
)

// ControllerContext holds
type ControllerContext struct {
kubeClient kubernetes.Interface

IngressInformer cache.SharedIndexInformer
ServiceInformer cache.SharedIndexInformer
PodInformer cache.SharedIndexInformer
NodeInformer cache.SharedIndexInformer
EndpointInformer cache.SharedIndexInformer
IngressInformer cache.SharedIndexInformer
ServiceInformer cache.SharedIndexInformer
BackendConfigInformer cache.SharedIndexInformer
PodInformer cache.SharedIndexInformer
NodeInformer cache.SharedIndexInformer
EndpointInformer cache.SharedIndexInformer

// Map of namespace => record.EventRecorder.
recorders map[string]record.EventRecorder
}

// NewControllerContext returns a new shared set of informers.
func NewControllerContext(kubeClient kubernetes.Interface, namespace string, resyncPeriod time.Duration, enableEndpointsInformer bool) *ControllerContext {
func NewControllerContext(
kubeClient kubernetes.Interface,
backendConfigClient backendconfigclient.Interface,
namespace string,
resyncPeriod time.Duration,
enableEndpointsInformer bool) *ControllerContext {

newIndexer := func() cache.Indexers {
return cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}
}
Expand All @@ -61,6 +70,9 @@ func NewControllerContext(kubeClient kubernetes.Interface, namespace string, res
if enableEndpointsInformer {
context.EndpointInformer = informerv1.NewEndpointsInformer(kubeClient, namespace, resyncPeriod, newIndexer())
}
if backendConfigClient != nil {
context.BackendConfigInformer = informerbackendconfig.NewBackendConfigInformer(backendConfigClient, namespace, resyncPeriod, newIndexer())
}

return context
}
Expand All @@ -76,6 +88,9 @@ func (ctx *ControllerContext) HasSynced() bool {
if ctx.EndpointInformer != nil {
funcs = append(funcs, ctx.EndpointInformer.HasSynced)
}
if ctx.BackendConfigInformer != nil {
funcs = append(funcs, ctx.BackendConfigInformer.HasSynced)
}
for _, f := range funcs {
if !f() {
return false
Expand Down Expand Up @@ -109,4 +124,7 @@ func (ctx *ControllerContext) Start(stopCh chan struct{}) {
if ctx.EndpointInformer != nil {
go ctx.EndpointInformer.Run(stopCh)
}
if ctx.BackendConfigInformer != nil {
go ctx.BackendConfigInformer.Run(stopCh)
}
}
70 changes: 59 additions & 11 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
backendconfigv1alpha1 "k8s.io/ingress-gce/pkg/apis/backendconfig/v1alpha1"
backendconfig "k8s.io/ingress-gce/pkg/backendconfig"

"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/context"
Expand Down Expand Up @@ -61,11 +63,11 @@ type LoadBalancerController struct {
client kubernetes.Interface
ctx *context.ControllerContext

ingLister StoreToIngressLister
nodeLister cache.Indexer
nodes *NodeController
ingLister StoreToIngressLister
// endpoint lister is needed when translating service target port to real endpoint target ports.
endpointLister StoreToEndpointLister
nodeLister cache.Indexer
nodes *NodeController

// TODO: Watch secrets
CloudClusterManager *ClusterManager
Expand All @@ -91,7 +93,14 @@ type LoadBalancerController struct {
// - clusterManager: A ClusterManager capable of creating all cloud resources
// required for L7 loadbalancing.
// - resyncPeriod: Watchers relist from the Kubernetes API server this often.
func NewLoadBalancerController(kubeClient kubernetes.Interface, stopCh chan struct{}, ctx *context.ControllerContext, clusterManager *ClusterManager, negEnabled bool) (*LoadBalancerController, error) {
func NewLoadBalancerController(
kubeClient kubernetes.Interface,
stopCh chan struct{},
ctx *context.ControllerContext,
clusterManager *ClusterManager,
negEnabled bool,
backendConfigEnabled bool) (*LoadBalancerController, error) {

broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(glog.Infof)
broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{
Expand All @@ -114,7 +123,7 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, stopCh chan stru
lbc.endpointLister.Indexer = ctx.EndpointInformer.GetIndexer()
}

// ingress event handler
// Ingress event handlers.
ctx.IngressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addIng := obj.(*extensions.Ingress)
Expand Down Expand Up @@ -152,17 +161,30 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, stopCh chan stru
},
})

// service event handler
// Service event handlers.
ctx.ServiceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: lbc.enqueueIngressForService,
AddFunc: lbc.enqueueIngressForObject,
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
lbc.enqueueIngressForService(cur)
lbc.enqueueIngressForObject(cur)
}
},
// Ingress deletes matter, service deletes don't.
})

// BackendConfig event handlers.
if backendConfigEnabled {
ctx.BackendConfigInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: lbc.enqueueIngressForObject,
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
lbc.enqueueIngressForObject(cur)
}
},
DeleteFunc: lbc.enqueueIngressForObject,
})
}

var endpointIndexer cache.Indexer
if ctx.EndpointInformer != nil {
endpointIndexer = ctx.EndpointInformer.GetIndexer()
Expand All @@ -180,9 +202,24 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, stopCh chan stru
return &lbc, nil
}

// enqueueIngressForService enqueues all the Ingress' for a Service.
func (lbc *LoadBalancerController) enqueueIngressForService(obj interface{}) {
svc := obj.(*apiv1.Service)
// enqueueIngressForObject enqueues Ingresses that are associated with the
// passed in object. It is a wrapper around functions which do the actual
// work of enqueueing Ingresses based on a typed object.
func (lbc *LoadBalancerController) enqueueIngressForObject(obj interface{}) {
switch obj.(type) {
case *apiv1.Service:
svc := obj.(*apiv1.Service)
lbc.enqueueIngressForService(svc)
case *backendconfigv1alpha1.BackendConfig:
beConfig := obj.(*backendconfigv1alpha1.BackendConfig)
lbc.enqueueIngressForBackendConfig(beConfig)
default:
// Do nothing.
}
}

// enqueueIngressForService enqueues all the Ingresses for a Service.
func (lbc *LoadBalancerController) enqueueIngressForService(svc *apiv1.Service) {
ings, err := lbc.ingLister.GetServiceIngress(svc)
if err != nil {
glog.V(5).Infof("ignoring service %v: %v", svc.Name, err)
Expand All @@ -196,6 +233,17 @@ func (lbc *LoadBalancerController) enqueueIngressForService(obj interface{}) {
}
}

// enqueueIngressForBackendConfig enqueues all Ingresses for a BackendConfig.
func (lbc *LoadBalancerController) enqueueIngressForBackendConfig(beConfig *backendconfigv1alpha1.BackendConfig) {
// Get all the Services associated with this BackendConfig.
svcLister := lbc.ctx.ServiceInformer.GetIndexer()
linkedSvcs := backendconfig.GetServicesForBackendConfig(svcLister, beConfig)
// Enqueue all the Ingresses associated with each Service.
for _, svc := range linkedSvcs {
lbc.enqueueIngressForService(svc)
}
}

// Run starts the loadbalancer controller.
func (lbc *LoadBalancerController) Run() {
glog.Infof("Starting loadbalancer controller")
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/kubernetes/fake"
backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned/fake"

"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/context"
Expand All @@ -53,9 +54,10 @@ func defaultBackendName(clusterName string) string {
// newLoadBalancerController create a loadbalancer controller.
func newLoadBalancerController(t *testing.T, cm *fakeClusterManager) *LoadBalancerController {
kubeClient := fake.NewSimpleClientset()
backendConfigClient := backendconfigclient.NewSimpleClientset()
stopCh := make(chan struct{})
ctx := context.NewControllerContext(kubeClient, api_v1.NamespaceAll, 1*time.Second, true)
lb, err := NewLoadBalancerController(kubeClient, stopCh, ctx, cm.ClusterManager, true)
ctx := context.NewControllerContext(kubeClient, backendConfigClient, api_v1.NamespaceAll, 1*time.Second, true)
lb, err := NewLoadBalancerController(kubeClient, stopCh, ctx, cm.ClusterManager, true, true)
if err != nil {
t.Fatalf("%v", err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/translator/translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/client-go/kubernetes/fake"
unversionedcore "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned/fake"

"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/context"
Expand All @@ -43,6 +44,7 @@ var (

func fakeTranslator(negEnabled bool) *Translator {
client := fake.NewSimpleClientset()
backendConfigClient := backendconfigclient.NewSimpleClientset()
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(glog.Infof)
broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{
Expand All @@ -51,7 +53,7 @@ func fakeTranslator(negEnabled bool) *Translator {

namer := utils.NewNamer("uid1", "fw1")

ctx := context.NewControllerContext(client, apiv1.NamespaceAll, 1*time.Second, negEnabled)
ctx := context.NewControllerContext(client, backendConfigClient, apiv1.NamespaceAll, 1*time.Second, negEnabled)
gce := &Translator{
recorders: ctx,
namer: namer,
Expand Down
4 changes: 3 additions & 1 deletion pkg/neg/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/ingress-gce/pkg/annotations"
backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned/fake"
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/utils"

"k8s.io/apimachinery/pkg/util/intstr"
)

func newTestController(kubeClient kubernetes.Interface) *Controller {
context := context.NewControllerContext(kubeClient, apiv1.NamespaceAll, 1*time.Second, true)
backendConfigClient := backendconfigclient.NewSimpleClientset()
context := context.NewControllerContext(kubeClient, backendConfigClient, apiv1.NamespaceAll, 1*time.Second, true)
controller, _ := NewController(kubeClient,
NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network"),
context,
Expand Down
4 changes: 3 additions & 1 deletion pkg/neg/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned/fake"
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/utils"
)
Expand All @@ -36,7 +37,8 @@ const (
)

func NewTestSyncerManager(kubeClient kubernetes.Interface) *syncerManager {
context := context.NewControllerContext(kubeClient, apiv1.NamespaceAll, 1*time.Second, true)
backendConfigClient := backendconfigclient.NewSimpleClientset()
context := context.NewControllerContext(kubeClient, backendConfigClient, apiv1.NamespaceAll, 1*time.Second, true)
manager := newSyncerManager(
utils.NewNamer(CluseterID, ""),
record.NewFakeRecorder(100),
Expand Down
4 changes: 3 additions & 1 deletion pkg/neg/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned/fake"
"k8s.io/ingress-gce/pkg/context"
)

Expand All @@ -23,7 +24,8 @@ const (

func NewTestSyncer() *syncer {
kubeClient := fake.NewSimpleClientset()
context := context.NewControllerContext(kubeClient, apiv1.NamespaceAll, 1*time.Second, true)
backendConfigClient := backendconfigclient.NewSimpleClientset()
context := context.NewControllerContext(kubeClient, backendConfigClient, apiv1.NamespaceAll, 1*time.Second, true)
svcPort := servicePort{
namespace: testServiceNamespace,
name: testServiceName,
Expand Down