From 4c03bfc812e7ceabcac0979290bd74d9efc9da15 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Thu, 2 Mar 2023 08:57:24 -0800 Subject: [PATCH] feat: refactor and improve K8s sync provider (#443) ## This PR fixes #434 This is a refactoring and internal improvement for K8s ISync provider. Improvements include, - Reduce K8s API load by utilizing Informer cache - Yet, provide a fallback if cache miss occurs (Note - we rely on K8s Informer for cache refill and consistency) - Informer now only watches a specific namespace (compared to **\***) - this is a potential performance improvement and a security improvement - Reduced informer handlers with extracted common logics - Unit tests where possible --------- Signed-off-by: Kavindu Dodanduwa --- go.mod | 2 +- go.sum | 13 - pkg/sync/kubernetes/kubernetes_sync.go | 272 +++++---- pkg/sync/kubernetes/kubernetes_sync_test.go | 611 ++++++++++++++++++++ 4 files changed, 765 insertions(+), 133 deletions(-) create mode 100644 pkg/sync/kubernetes/kubernetes_sync_test.go diff --git a/go.mod b/go.mod index 36c65d80c..b8d301b1d 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,6 @@ require ( google.golang.org/grpc v1.53.0 google.golang.org/protobuf v1.28.1 gopkg.in/yaml.v3 v3.0.1 - k8s.io/api v0.26.2 k8s.io/apimachinery v0.26.2 k8s.io/client-go v0.26.2 sigs.k8s.io/controller-runtime v0.14.5 @@ -115,6 +114,7 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect + k8s.io/api v0.26.2 // indirect k8s.io/apiextensions-apiserver v0.26.1 // indirect k8s.io/component-base v0.26.1 // indirect k8s.io/klog/v2 v2.90.0 // indirect diff --git a/go.sum b/go.sum index dbcd2418f..3f25c0628 100644 --- a/go.sum +++ b/go.sum @@ -285,18 +285,12 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8m github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo/v2 v2.6.0 h1:9t9b9vRUbFq3C4qKFCGkVuq/fIHji802N1nrtkh1mNc= github.com/onsi/gomega v1.24.1 h1:KORJXNNTzJXzu4ScJWssJfJMnJ+2QJqhoQSRwNlze9E= -github.com/open-feature/go-sdk v1.1.0 h1:JOOa0AleJFUvnWoF9KWdLqYosi5fDIRBDzPYZPr5qgM= -github.com/open-feature/go-sdk v1.1.0/go.mod h1:R8QJmLdSHFaRdrWtwmp5bVK35Q+O/cEGtYaiy6NM6kc= github.com/open-feature/go-sdk v1.2.0 h1:2xsUgNUUDITpryB9nFS43CI9gAF415I1He22Q1d4+Po= github.com/open-feature/go-sdk v1.2.0/go.mod h1:UQJJXUptk92An4F6so2Vd0iRo6EEZ+QGa7HVyQ/GPi0= -github.com/open-feature/go-sdk-contrib/providers/flagd v0.1.7 h1:0s8reX/EfCNV37PsGSr55wUpppPtyp0jZKeuVAaWZ+4= -github.com/open-feature/go-sdk-contrib/providers/flagd v0.1.7/go.mod h1:dHB0hsYykZ1Un+CdnWErqLqUQswUADIvDg2VwDLx7gs= github.com/open-feature/go-sdk-contrib/providers/flagd v0.1.9 h1:hHa7sjOzohj9ZhYR6ym+Xjk517ogb4q2QIE6ztdLZMg= github.com/open-feature/go-sdk-contrib/providers/flagd v0.1.9/go.mod h1:IibpAPNmtUIJsJA6T4X1IcD4+BG1hCLw86luG8YQcqA= github.com/open-feature/go-sdk-contrib/tests/flagd v1.2.1 h1:Tg712Egcqb5dsYxOGEaQbfD3g1mqPFdV4tSmKKKxDPk= github.com/open-feature/go-sdk-contrib/tests/flagd v1.2.1/go.mod h1:zw/xpuDy9ziBEKVA1t4VoQtzFc80btAAQCiZkX6y9oQ= -github.com/open-feature/open-feature-operator v0.2.28 h1:qzzVq8v9G7aXO7luocO/wQCGnTJjtcQh75mDOqjnFxo= -github.com/open-feature/open-feature-operator v0.2.28/go.mod h1:bQncVK7hvhj5QStPwexxQ1aArPwox2Y1vWrVei/qIFg= github.com/open-feature/open-feature-operator v0.2.29 h1:Ky/SMzwEiBV5x9qOfHTj1jl/CakPZNClRtoeSPqVbNo= github.com/open-feature/open-feature-operator v0.2.29/go.mod h1:bQncVK7hvhj5QStPwexxQ1aArPwox2Y1vWrVei/qIFg= github.com/open-feature/schemas v0.2.8 h1:oA75hJXpOd9SFgmNI2IAxWZkwzQPUDm7Jyyh3q489wM= @@ -359,7 +353,6 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= @@ -756,18 +749,12 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= -k8s.io/api v0.26.1 h1:f+SWYiPd/GsiWwVRz+NbFyCgvv75Pk9NK6dlkZgpCRQ= -k8s.io/api v0.26.1/go.mod h1:xd/GBNgR0f707+ATNyPmQ1oyKSgndzXij81FzWGsejg= k8s.io/api v0.26.2 h1:dM3cinp3PGB6asOySalOZxEG4CZ0IAdJsrYZXE/ovGQ= k8s.io/api v0.26.2/go.mod h1:1kjMQsFE+QHPfskEcVNgL3+Hp88B80uj0QtSOlj8itU= k8s.io/apiextensions-apiserver v0.26.1 h1:cB8h1SRk6e/+i3NOrQgSFij1B2S0Y0wDoNl66bn8RMI= k8s.io/apiextensions-apiserver v0.26.1/go.mod h1:AptjOSXDGuE0JICx/Em15PaoO7buLwTs0dGleIHixSM= -k8s.io/apimachinery v0.26.1 h1:8EZ/eGJL+hY/MYCNwhmDzVqq2lPl3N3Bo8rvweJwXUQ= -k8s.io/apimachinery v0.26.1/go.mod h1:tnPmbONNJ7ByJNz9+n9kMjNP8ON+1qoAIIC70lztu74= k8s.io/apimachinery v0.26.2 h1:da1u3D5wfR5u2RpLhE/ZtZS2P7QvDgLZTi9wrNZl/tQ= k8s.io/apimachinery v0.26.2/go.mod h1:ats7nN1LExKHvJ9TmwootT00Yz05MuYqPXEXaVeOy5I= -k8s.io/client-go v0.26.1 h1:87CXzYJnAMGaa/IDDfRdhTzxk/wzGZ+/HUQpqgVSZXU= -k8s.io/client-go v0.26.1/go.mod h1:IWNSglg+rQ3OcvDkhY6+QLeasV4OYHDjdqeWkDQZwGE= k8s.io/client-go v0.26.2 h1:s1WkVujHX3kTp4Zn4yGNFK+dlDXy1bAAkIl+cFAiuYI= k8s.io/client-go v0.26.2/go.mod h1:u5EjOuSyBa09yqqyY7m3abZeovO/7D/WehVVlZ2qcqU= k8s.io/component-base v0.26.1 h1:4ahudpeQXHZL5kko+iDHqLj/FSGAEUnSVO0EBbgDd+4= diff --git a/pkg/sync/kubernetes/kubernetes_sync.go b/pkg/sync/kubernetes/kubernetes_sync.go index c06195bd2..6d2378418 100644 --- a/pkg/sync/kubernetes/kubernetes_sync.go +++ b/pkg/sync/kubernetes/kubernetes_sync.go @@ -2,16 +2,15 @@ package kubernetes import ( "context" - "errors" "fmt" "os" "strings" + msync "sync" "time" "github.com/open-feature/flagd/pkg/logger" "github.com/open-feature/flagd/pkg/sync" "github.com/open-feature/open-feature-operator/apis/core/v1alpha1" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/dynamic" @@ -23,15 +22,22 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -var resyncPeriod = 1 * time.Minute +var ( + resyncPeriod = 1 * time.Minute + apiVersion = fmt.Sprintf("%s/%s", v1alpha1.GroupVersion.Group, v1alpha1.GroupVersion.Version) +) type Sync struct { Logger *logger.Logger ProviderArgs sync.ProviderArgs - client client.Client URI string - Source string - ready bool + + Source string + ready bool + namespace string + crdName string + readClient client.Reader + informer cache.SharedInformer } func (k *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error { @@ -44,16 +50,49 @@ func (k *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error } func (k *Sync) Init(ctx context.Context) error { - // noop + var err error + + k.namespace, k.crdName, err = parseURI(k.URI) + if err != nil { + return err + } + + if err := v1alpha1.AddToScheme(scheme.Scheme); err != nil { + return err + } + clusterConfig, err := k8sClusterConfig() + if err != nil { + return err + } + + k.readClient, err = client.New(clusterConfig, client.Options{Scheme: scheme.Scheme}) + if err != nil { + return err + } + + dynamicClient, err := dynamic.NewForConfig(clusterConfig) + if err != nil { + return err + } + + resource := v1alpha1.GroupVersion.WithResource("featureflagconfigurations") + + // The created informer will not do resyncs if the given defaultEventHandlerResyncPeriod is zero. + // For more details on resync implications refer to tools/cache/shared_informer.go + factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, resyncPeriod, k.namespace, nil) + + k.informer = factory.ForResource(resource).Informer() + return nil } func (k *Sync) IsReady() bool { - // we cannot reliably check external HTTP(s) sources return k.ready } func (k *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { + k.Logger.Info(fmt.Sprintf("starting kubernetes sync notifier for resource: %s", k.URI)) + // Initial fetch fetch, err := k.fetch(ctx) if err != nil { @@ -65,12 +104,31 @@ func (k *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { notifies := make(chan INotify) - go k.notify(ctx, notifies) + var wg msync.WaitGroup + + // Start K8s resource notifier + wg.Add(1) + go func() { + defer wg.Done() + k.notify(ctx, notifies) + }() + + // Start notifier watcher + wg.Add(1) + go func() { + defer wg.Done() + k.watcher(ctx, notifies, dataSync) + }() + + wg.Wait() + return nil +} +func (k *Sync) watcher(ctx context.Context, notifies chan INotify, dataSync chan<- sync.DataSync) { for { select { case <-ctx.Done(): - return nil + return case w := <-notifies: switch w.GetEvent().EventType { case DefaultEventTypeCreate: @@ -86,7 +144,7 @@ func (k *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { k.Logger.Debug("Configuration modified") msg, err := k.fetch(ctx) if err != nil { - k.Logger.Error(fmt.Sprintf("error fetching after write notification: %s", err.Error())) + k.Logger.Error(fmt.Sprintf("error fetching after update notification: %s", err.Error())) continue } @@ -101,101 +159,47 @@ func (k *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { } } +// fetch attempts to retrieve the latest feature flag configurations func (k *Sync) fetch(ctx context.Context) (string, error) { - if k.URI == "" { - k.Logger.Error("no target feature flag configuration set") - return "{}", nil - } - - ns, name, err := parseURI(k.URI) + // first check the store - avoid overloading API + item, exist, err := k.informer.GetStore().GetByKey(k.URI) if err != nil { - k.Logger.Error(err.Error()) - return "{}", nil + return "", err } - if k.client == nil { - k.Logger.Warn("client not initialised") - return "{}", nil + if exist { + configuration, err := toFFCfg(item) + if err != nil { + return "", err + } + + k.Logger.Debug(fmt.Sprintf("resource %s served from the informer cache", k.URI)) + return configuration.Spec.FeatureFlagSpec, nil } + // fallback to API access - this is an informer cache miss. Could happen at the startup where cache is not filled var ff v1alpha1.FeatureFlagConfiguration - err = k.client.Get(ctx, client.ObjectKey{ - Name: name, - Namespace: ns, + err = k.readClient.Get(ctx, client.ObjectKey{ + Name: k.crdName, + Namespace: k.namespace, }, &ff) - - return ff.Spec.FeatureFlagSpec, err -} - -func parseURI(uri string) (string, string, error) { - s := strings.Split(uri, "/") - if len(s) != 2 { - return "", "", fmt.Errorf("invalid uri received: %s", uri) - } - return s[0], s[1], nil -} - -func (k *Sync) buildConfiguration() (*rest.Config, error) { - kubeconfig := os.Getenv("KUBECONFIG") - var clusterConfig *rest.Config - var err error - if kubeconfig != "" { - clusterConfig, err = clientcmd.BuildConfigFromFlags("", kubeconfig) - } else { - clusterConfig, err = rest.InClusterConfig() - } if err != nil { - return nil, err + return "", err } - return clusterConfig, nil + k.Logger.Debug(fmt.Sprintf("resource %s served from API server", k.URI)) + return ff.Spec.FeatureFlagSpec, nil } -//nolint:funlen func (k *Sync) notify(ctx context.Context, c chan<- INotify) { - if k.URI == "" { - k.Logger.Error("No target feature flag configuration set") - return - } - ns, name, err := parseURI(k.URI) - if err != nil { - k.Logger.Error(err.Error()) - return - } - k.Logger.Info( - fmt.Sprintf("starting kubernetes sync notifier for resource: %s", - k.URI, - ), - ) - clusterConfig, err := k.buildConfiguration() - if err != nil { - k.Logger.Error(fmt.Sprintf("error building configuration: %s", err)) - } - if err := v1alpha1.AddToScheme(scheme.Scheme); err != nil { - k.Logger.Fatal(err.Error()) - } - k.client, err = client.New(clusterConfig, client.Options{Scheme: scheme.Scheme}) - if err != nil { - k.Logger.Fatal(err.Error()) - } - clusterClient, err := dynamic.NewForConfig(clusterConfig) - if err != nil { - k.Logger.Fatal(err.Error()) - } - resource := v1alpha1.GroupVersion.WithResource("featureflagconfigurations") - // The created informer will not do resyncs if the given defaultEventHandlerResyncPeriod is zero. - // For more details on resync implications refer to tools/cache/shared_informer.go - factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(clusterClient, - resyncPeriod, corev1.NamespaceAll, nil) - informer := factory.ForResource(resource).Informer() objectKey := client.ObjectKey{ - Name: name, - Namespace: ns, + Name: k.crdName, + Namespace: k.namespace, } - if _, err = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + if _, err := k.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { k.Logger.Info(fmt.Sprintf("kube sync notifier event: add: %s %s", objectKey.Namespace, objectKey.Name)) - if err := createFuncHandler(obj, objectKey, c); err != nil { + if err := commonHandler(obj, objectKey, DefaultEventTypeCreate, c); err != nil { k.Logger.Warn(err.Error()) } }, @@ -207,7 +211,7 @@ func (k *Sync) notify(ctx context.Context, c chan<- INotify) { }, DeleteFunc: func(obj interface{}) { k.Logger.Info(fmt.Sprintf("kube sync notifier event: delete: %s %s", objectKey.Namespace, objectKey.Name)) - if err := deleteFuncHandler(obj, objectKey, c); err != nil { + if err := commonHandler(obj, objectKey, DefaultEventTypeDelete, c); err != nil { k.Logger.Warn(err.Error()) } }, @@ -220,48 +224,52 @@ func (k *Sync) notify(ctx context.Context, c chan<- INotify) { EventType: DefaultEventTypeReady, }, } - informer.Run(ctx.Done()) + + k.informer.Run(ctx.Done()) } -func createFuncHandler(obj interface{}, object client.ObjectKey, c chan<- INotify) error { - var ffObj v1alpha1.FeatureFlagConfiguration - u := obj.(*unstructured.Unstructured) - err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &ffObj) +// commonHandler emits the desired event if and only if handler receive an object matching apiVersion and resource name +func commonHandler(obj interface{}, object client.ObjectKey, emitEvent DefaultEventType, c chan<- INotify) error { + ffObj, err := toFFCfg(obj) if err != nil { return err } - if ffObj.APIVersion != fmt.Sprintf("%s/%s", v1alpha1.GroupVersion.Group, v1alpha1.GroupVersion.Version) { - return errors.New("invalid api version") + + if ffObj.APIVersion != apiVersion { + return fmt.Errorf("invalid api version %s, expected %s", ffObj.APIVersion, apiVersion) } + if ffObj.Name == object.Name { c <- &Notifier{ Event: Event[DefaultEventType]{ - EventType: DefaultEventTypeCreate, + EventType: emitEvent, }, } } + return nil } +// updateFuncHandler handles updates. Event is emitted if and only if resource name, apiVersion of old & new are equal func updateFuncHandler(oldObj interface{}, newObj interface{}, object client.ObjectKey, c chan<- INotify) error { - var ffOldObj v1alpha1.FeatureFlagConfiguration - u := oldObj.(*unstructured.Unstructured) - err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &ffOldObj) + ffOldObj, err := toFFCfg(oldObj) if err != nil { return err } - if ffOldObj.APIVersion != fmt.Sprintf("%s/%s", v1alpha1.GroupVersion.Group, v1alpha1.GroupVersion.Version) { - return errors.New("invalid api version") + + if ffOldObj.APIVersion != apiVersion { + return fmt.Errorf("invalid api version %s, expected %s", ffOldObj.APIVersion, apiVersion) } - var ffNewObj v1alpha1.FeatureFlagConfiguration - u = newObj.(*unstructured.Unstructured) - err = runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &ffNewObj) + + ffNewObj, err := toFFCfg(newObj) if err != nil { return err } - if ffNewObj.APIVersion != fmt.Sprintf("%s/%s", v1alpha1.GroupVersion.Group, v1alpha1.GroupVersion.Version) { - return errors.New("invalid api version") + + if ffNewObj.APIVersion != apiVersion { + return fmt.Errorf("invalid api version %s, expected %s", ffNewObj.APIVersion, apiVersion) } + if object.Name == ffNewObj.Name && ffOldObj.ResourceVersion != ffNewObj.ResourceVersion { // Only update if there is an actual featureFlagSpec change c <- &Notifier{ @@ -273,22 +281,48 @@ func updateFuncHandler(oldObj interface{}, newObj interface{}, object client.Obj return nil } -func deleteFuncHandler(obj interface{}, object client.ObjectKey, c chan<- INotify) error { +// toFFCfg attempts to covert unstructured payload to configurations +func toFFCfg(object interface{}) (*v1alpha1.FeatureFlagConfiguration, error) { + u, ok := object.(*unstructured.Unstructured) + if !ok { + return nil, fmt.Errorf("provided value is not of type *unstructured.Unstructured") + } + var ffObj v1alpha1.FeatureFlagConfiguration - u := obj.(*unstructured.Unstructured) err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &ffObj) if err != nil { - return err + return nil, err } - if ffObj.APIVersion != fmt.Sprintf("%s/%s", v1alpha1.GroupVersion.Group, v1alpha1.GroupVersion.Version) { - return errors.New("invalid api version") + + return &ffObj, nil +} + +// parseURI parse provided uri in the format of / to namespace, crdName. Results in an error +// for invalid format or failed parsing +func parseURI(uri string) (string, string, error) { + s := strings.Split(uri, "/") + if len(s) != 2 || len(s[0]) == 0 || len(s[1]) == 0 { + return "", "", fmt.Errorf("invalid resource uri format, expected / but got: %s", uri) } - if ffObj.Name == object.Name { - c <- &Notifier{ - Event: Event[DefaultEventType]{ - EventType: DefaultEventTypeDelete, - }, - } + return s[0], s[1], nil +} + +// k8sClusterConfig build K8s connection config based available configurations +func k8sClusterConfig() (*rest.Config, error) { + cfg := os.Getenv("KUBECONFIG") + + var clusterConfig *rest.Config + var err error + + if cfg != "" { + clusterConfig, err = clientcmd.BuildConfigFromFlags("", cfg) + } else { + clusterConfig, err = rest.InClusterConfig() } - return nil + + if err != nil { + return nil, err + } + + return clusterConfig, nil } diff --git a/pkg/sync/kubernetes/kubernetes_sync_test.go b/pkg/sync/kubernetes/kubernetes_sync_test.go new file mode 100644 index 000000000..0f92013ac --- /dev/null +++ b/pkg/sync/kubernetes/kubernetes_sync_test.go @@ -0,0 +1,611 @@ +package kubernetes + +import ( + "context" + "encoding/json" + "errors" + "reflect" + "testing" + "time" + + "github.com/open-feature/flagd/pkg/sync" + + "github.com/open-feature/flagd/pkg/logger" + "k8s.io/client-go/tools/cache" + + "sigs.k8s.io/controller-runtime/pkg/controller/controllertest" + + "github.com/open-feature/open-feature-operator/apis/core/v1alpha1" + "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var Metadata = v1.TypeMeta{ + Kind: "FeatureFlagConfiguration", + APIVersion: apiVersion, +} + +func Test_parseURI(t *testing.T) { + tests := []struct { + name string + uri string + ns string + resource string + err bool + }{ + { + name: "simple success", + uri: "namespace/resource", + ns: "namespace", + resource: "resource", + err: false, + }, + { + name: "simple error - no ns", + uri: "/resource", + err: true, + }, + { + name: "simple error - no resource", + uri: "resource/", + err: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ns, rs, err := parseURI(tt.uri) + if (err != nil) != tt.err { + t.Errorf("parseURI() error = %v, wantErr %v", err, tt.err) + return + } + if ns != tt.ns { + t.Errorf("parseURI() got = %v, want %v", ns, tt.ns) + } + if rs != tt.resource { + t.Errorf("parseURI() got1 = %v, want %v", rs, tt.resource) + } + }) + } +} + +func Test_toFFCfg(t *testing.T) { + validFFCfg := v1alpha1.FeatureFlagConfiguration{ + TypeMeta: Metadata, + } + + tests := []struct { + name string + input interface{} + want *v1alpha1.FeatureFlagConfiguration + wantErr bool + }{ + { + name: "Simple success", + input: toUnstructured(t, validFFCfg), + want: &validFFCfg, + wantErr: false, + }, + { + name: "Simple error", + input: struct { + flag string + }{ + flag: "test", + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := toFFCfg(tt.input) + + if (err != nil) != tt.wantErr { + t.Errorf("toFFCfg() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("toFFCfg() got = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_commonHandler(t *testing.T) { + cfgNs := "resourceNS" + cfgName := "resourceName" + + validFFCfg := v1alpha1.FeatureFlagConfiguration{ + TypeMeta: Metadata, + ObjectMeta: v1.ObjectMeta{ + Namespace: cfgNs, + Name: cfgName, + }, + } + + type args struct { + obj interface{} + object client.ObjectKey + } + tests := []struct { + name string + args args + wantErr bool + wantEvent bool + eventType DefaultEventType + }{ + { + name: "simple success", + args: args{ + obj: toUnstructured(t, validFFCfg), + object: client.ObjectKey{ + Namespace: cfgNs, + Name: cfgName, + }, + }, + wantEvent: true, + wantErr: false, + }, + { + name: "simple scenario - only notify if resource name matches", + args: args{ + obj: toUnstructured(t, validFFCfg), + object: client.ObjectKey{ + Namespace: cfgNs, + Name: "SomeOtherResource", + }, + }, + wantEvent: false, + wantErr: false, + }, + { + name: "simple error - API mismatch", + args: args{ + obj: toUnstructured(t, v1alpha1.FeatureFlagConfiguration{ + TypeMeta: v1.TypeMeta{ + Kind: "FeatureFlagConfiguration", + APIVersion: "someAPIVersion", + }, + }), + object: client.ObjectKey{ + Namespace: cfgNs, + Name: cfgName, + }, + }, + wantErr: true, + wantEvent: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + syncChan := make(chan INotify, 1) + + err := commonHandler(tt.args.obj, tt.args.object, tt.eventType, syncChan) + if err != nil && !tt.wantErr { + t.Errorf("commonHandler() error = %v, wantErr %v", err, tt.wantErr) + } + + if tt.wantErr { + if err == nil { + t.Errorf("commonHandler() expected error but received none.") + } + + // Expected error occurred, hence continue + return + } + + if tt.wantEvent != true { + // Not interested in the event, hence ignore notification check. But check for chan writes + if len(syncChan) != 0 { + t.Errorf("commonHandler() expected no events, but events are available: %d", len(syncChan)) + } + + return + } + + // watch events with a timeout + var notify INotify + select { + case notify = <-syncChan: + case <-time.After(2 * time.Second): + t.Errorf("timedout waiting for events from commonHandler()") + } + + if notify.GetEvent().EventType != tt.eventType { + t.Errorf("commonHandler() event = %v, wanted %v", notify.GetEvent().EventType, DefaultEventTypeDelete) + } + }) + } +} + +func Test_updateFuncHandler(t *testing.T) { + cfgNs := "resourceNS" + cfgName := "resourceName" + + validFFCfgOld := v1alpha1.FeatureFlagConfiguration{ + TypeMeta: Metadata, + ObjectMeta: v1.ObjectMeta{ + Namespace: cfgNs, + Name: cfgName, + ResourceVersion: "v1", + }, + } + + validFFCfgNew := validFFCfgOld + validFFCfgNew.ResourceVersion = "v2" + + type args struct { + oldObj interface{} + newObj interface{} + object client.ObjectKey + } + tests := []struct { + name string + args args + wantErr bool + wantEvent bool + }{ + { + name: "Simple success", + args: args{ + oldObj: toUnstructured(t, validFFCfgOld), + newObj: toUnstructured(t, validFFCfgNew), + object: client.ObjectKey{ + Namespace: cfgNs, + Name: cfgName, + }, + }, + wantErr: false, + wantEvent: true, + }, + { + name: "Simple scenario - notify only if resource name match", + args: args{ + oldObj: toUnstructured(t, validFFCfgOld), + newObj: toUnstructured(t, validFFCfgNew), + object: client.ObjectKey{ + Namespace: cfgNs, + Name: "SomeOtherResource", + }, + }, + wantErr: false, + wantEvent: false, + }, + { + name: "Simple scenario - notify only if resource version is new", + args: args{ + oldObj: toUnstructured(t, validFFCfgOld), + newObj: toUnstructured(t, validFFCfgOld), + object: client.ObjectKey{ + Namespace: cfgNs, + Name: "SomeOtherResource", + }, + }, + wantErr: false, + wantEvent: false, + }, + { + name: "Simple error - API version mismatch new object", + args: args{ + oldObj: toUnstructured(t, validFFCfgOld), + newObj: toUnstructured(t, v1alpha1.FeatureFlagConfiguration{ + TypeMeta: v1.TypeMeta{ + Kind: "FeatureFlagConfiguration", + APIVersion: "someAPIVersion", + }, + }), + object: client.ObjectKey{ + Namespace: cfgNs, + Name: cfgName, + }, + }, + wantErr: true, + wantEvent: false, + }, + { + name: "Simple error - API version mismatch old object", + args: args{ + oldObj: toUnstructured(t, v1alpha1.FeatureFlagConfiguration{ + TypeMeta: v1.TypeMeta{ + Kind: "FeatureFlagConfiguration", + APIVersion: "someAPIVersion", + }, + }), + newObj: toUnstructured(t, validFFCfgNew), + object: client.ObjectKey{ + Namespace: cfgNs, + Name: cfgName, + }, + }, + wantErr: true, + wantEvent: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + syncChan := make(chan INotify, 1) + + err := updateFuncHandler(tt.args.oldObj, tt.args.newObj, tt.args.object, syncChan) + if err != nil && !tt.wantErr { + t.Errorf("updateFuncHandler() error = %v, wantErr %v", err, tt.wantErr) + } + + if tt.wantErr { + if err == nil { + t.Errorf("updateFuncHandler() expected error but received none.") + } + + // Expected error occurred, hence continue + return + } + + if tt.wantEvent != true { + // Not interested in the event, hence ignore notification check. But check for chan writes + if len(syncChan) != 0 { + t.Errorf("updateFuncHandler() expected no events, but events are available: %d", len(syncChan)) + } + + return + } + + // watch events with a timeout + var notify INotify + select { + case notify = <-syncChan: + case <-time.After(2 * time.Second): + t.Errorf("timedout waiting for events from updateFuncHandler()") + } + + if notify.GetEvent().EventType != DefaultEventTypeModify { + t.Errorf("updateFuncHandler() event = %v, wanted %v", notify.GetEvent().EventType, DefaultEventTypeModify) + } + }) + } +} + +func TestSync_fetch(t *testing.T) { + flagSpec := "fakeFlagSpec" + + validCfg := v1alpha1.FeatureFlagConfiguration{ + TypeMeta: Metadata, + ObjectMeta: v1.ObjectMeta{ + Namespace: "resourceNS", + Name: "resourceName", + ResourceVersion: "v1", + }, + Spec: v1alpha1.FeatureFlagConfigurationSpec{ + FeatureFlagSpec: flagSpec, + }, + } + + type args struct { + InformerGetFunc func(key string) (item interface{}, exists bool, err error) + ClientResponse v1alpha1.FeatureFlagConfiguration + ClientError error + } + + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "Scenario - get from informer cache", + args: args{ + InformerGetFunc: func(key string) (item interface{}, exists bool, err error) { + return toUnstructured(t, validCfg), true, nil + }, + }, + wantErr: false, + want: flagSpec, + }, + { + name: "Scenario - get from API if informer cache miss", + args: args{ + InformerGetFunc: func(key string) (item interface{}, exists bool, err error) { + return nil, false, nil + }, + ClientResponse: validCfg, + }, + wantErr: false, + want: flagSpec, + }, + { + name: "Scenario - error for informer cache read error", + args: args{ + InformerGetFunc: func(key string) (item interface{}, exists bool, err error) { + return nil, false, errors.New("mock error") + }, + }, + wantErr: true, + }, + { + name: "Scenario - error for API get error", + args: args{ + InformerGetFunc: func(key string) (item interface{}, exists bool, err error) { + return nil, false, nil + }, + ClientError: errors.New("mock error"), + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Setup with args + k := &Sync{ + informer: &MockInformer{ + fakeStore: cache.FakeCustomStore{ + GetByKeyFunc: tt.args.InformerGetFunc, + }, + }, + readClient: &MockClient{ + getResponse: tt.args.ClientResponse, + clientErr: tt.args.ClientError, + }, + Logger: logger.NewLogger(nil, false), + } + + // Test fetch + got, err := k.fetch(context.Background()) + + if (err != nil) != tt.wantErr { + t.Errorf("fetch() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if got != tt.want { + t.Errorf("fetch() got = %v, want %v", got, tt.want) + } + }) + } +} + +func TestSync_watcher(t *testing.T) { + flagSpec := "fakeFlagSpec" + + validCfg := v1alpha1.FeatureFlagConfiguration{ + TypeMeta: Metadata, + ObjectMeta: v1.ObjectMeta{ + Namespace: "resourceNS", + Name: "resourceName", + ResourceVersion: "v1", + }, + Spec: v1alpha1.FeatureFlagConfigurationSpec{ + FeatureFlagSpec: flagSpec, + }, + } + + type args struct { + InformerGetFunc func(key string) (item interface{}, exists bool, err error) + notification INotify + } + + tests := []struct { + name string + args args + want string + }{ + { + name: "scenario - create event", + want: flagSpec, + args: args{ + InformerGetFunc: func(key string) (item interface{}, exists bool, err error) { + return toUnstructured(t, validCfg), true, nil + }, + notification: &Notifier{ + Event: Event[DefaultEventType]{ + EventType: DefaultEventTypeCreate, + }, + }, + }, + }, + { + name: "scenario - modify event", + want: flagSpec, + args: args{ + InformerGetFunc: func(key string) (item interface{}, exists bool, err error) { + return toUnstructured(t, validCfg), true, nil + }, + notification: &Notifier{ + Event: Event[DefaultEventType]{ + EventType: DefaultEventTypeModify, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // setup sync + k := &Sync{ + informer: &MockInformer{ + fakeStore: cache.FakeCustomStore{ + GetByKeyFunc: tt.args.InformerGetFunc, + }, + }, + Logger: logger.NewLogger(nil, false), + } + + // create communication channels with buffer to so that calls are non-blocking + notifies := make(chan INotify, 1) + dataSyncs := make(chan sync.DataSync, 1) + + // emit event + notifies <- tt.args.notification + + tCtx, cFunc := context.WithTimeout(context.Background(), 2*time.Second) + defer cFunc() + + // start watcher + go k.watcher(tCtx, notifies, dataSyncs) + + // wait for data sync + select { + case <-tCtx.Done(): + t.Errorf("timeout waiting for the results") + case dataSyncs := <-dataSyncs: + if dataSyncs.FlagData != tt.want { + t.Errorf("fetch() got = %v, want %v", dataSyncs.FlagData, tt.want) + } + } + }) + } +} + +// toUnstructured helper to convert an interface to unstructured.Unstructured +func toUnstructured(t *testing.T, obj interface{}) interface{} { + bytes, err := json.Marshal(obj) + if err != nil { + t.Errorf("test setup faulure: %s", err.Error()) + } + + var res map[string]interface{} + + err = json.Unmarshal(bytes, &res) + if err != nil { + t.Errorf("test setup faulure: %s", err.Error()) + } + + return &unstructured.Unstructured{Object: res} +} + +// Mock implementations + +// MockClient contains an embedded client.Reader for desired method overriding +type MockClient struct { + client.Reader + clientErr error + + getResponse v1alpha1.FeatureFlagConfiguration +} + +func (m MockClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + // return error if error is set + if m.clientErr != nil { + return m.clientErr + } + + // else try returning response + cfg, ok := obj.(*v1alpha1.FeatureFlagConfiguration) + if !ok { + return errors.New("must contain a pointer typed v1alpha1.FeatureFlagConfiguration") + } + + *cfg = m.getResponse + return nil +} + +// MockInformer contains an embedded controllertest.FakeInformer for desired method overriding +type MockInformer struct { + controllertest.FakeInformer + + fakeStore cache.FakeCustomStore +} + +func (m MockInformer) GetStore() cache.Store { + return &m.fakeStore +}