diff --git a/pkg/cache/multi_informer.go b/pkg/cache/multi_informer.go new file mode 100644 index 00000000000..c4b27e2bed5 --- /dev/null +++ b/pkg/cache/multi_informer.go @@ -0,0 +1,64 @@ +package cache + +import ( + "fmt" + "time" + + toolscache "k8s.io/client-go/tools/cache" +) + +type multiNamesapceIndexInformer struct { + indexInformers map[string]toolscache.SharedIndexInformer +} + +func (m *multiNamesapceIndexInformer) AddIndexers(indexers toolscache.Indexers) error { + return nil +} + +func (m *multiNamesapceIndexInformer) GetIndexer() toolscache.Indexer { + return nil +} + +func (m *multiNamesapceIndexInformer) AddEventHandler(handler toolscache.ResourceEventHandler) { + for namespace, i := range m.indexInformers { + fmt.Printf("\nadding event handler: %v for namespace: %v\n", i, namespace) + i.AddEventHandler(handler) + } +} + +func (m *multiNamesapceIndexInformer) AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) { + for _, i := range m.indexInformers { + i.AddEventHandlerWithResyncPeriod(handler, resyncPeriod) + } +} + +func (m *multiNamesapceIndexInformer) GetStore() toolscache.Store { + return nil +} + +func (m *multiNamesapceIndexInformer) GetController() toolscache.Controller { + return nil + +} + +func (m *multiNamesapceIndexInformer) Run(stopCh <-chan struct{}) { + for _, i := range m.indexInformers { + i.Run(stopCh) + } +} + +func (m *multiNamesapceIndexInformer) HasSynced() bool { + fmt.Printf("has synced") + for _, i := range m.indexInformers { + if synced := i.HasSynced(); !synced { + fmt.Printf("has synced - %v", synced) + return synced + } + } + fmt.Printf("has synced- true") + return true +} + +func (m *multiNamesapceIndexInformer) LastSyncResourceVersion() string { + return "" +} diff --git a/pkg/cache/multi_namespace.go b/pkg/cache/multi_namespace.go new file mode 100644 index 00000000000..68698becedd --- /dev/null +++ b/pkg/cache/multi_namespace.go @@ -0,0 +1,132 @@ +package cache + +import ( + "context" + "fmt" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + toolscache "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +// MultiNamespaceCache - allows for using multiple namespaces for the cache. +type multiNamespaceCache struct { + caches map[string]cache.Cache +} + +// NewMultiNamespaceCache - creates a new cache that can handle multiple namespaces +func NewMultiNamespaceCache(namespaces []string, mgr manager.Manager) (cache.Cache, error) { + m := multiNamespaceCache{ + caches: map[string]cache.Cache{}, + } + for _, namespace := range namespaces { + o := cache.Options{ + Scheme: mgr.GetScheme(), + Mapper: mgr.GetRESTMapper(), + Namespace: namespace, + } + + cache, err := cache.New(mgr.GetConfig(), o) + if err != nil { + return nil, err + } + m.caches[namespace] = cache + } + return &m, nil +} + +var _ cache.Cache = &multiNamespaceCache{} + +// Get - Gets the object from the cache based on the key +func (m *multiNamespaceCache) Get(ctx context.Context, key client.ObjectKey, obj runtime.Object) error { + c, ok := m.caches[key.Namespace] + if !ok { + return fmt.Errorf("unknown namespace for cache: %v", key.Namespace) + } + return c.Get(ctx, key, obj) +} + +// List - Gets the list from the cache. +func (m *multiNamespaceCache) List(ctx context.Context, opts *client.ListOptions, list runtime.Object) error { + // TODO: we may want to assume that empty namespace means seach in all namespaces I am watching. + c, ok := m.caches[opts.Namespace] + if !ok { + return fmt.Errorf("unknown namespace for cache: %v", opts.Namespace) + } + return c.List(ctx, opts, list) +} + +// GetInformer - get an informer for an obj +func (m *multiNamespaceCache) GetInformer(obj runtime.Object) (toolscache.SharedIndexInformer, error) { + // TODO: we need to create a way to return a shared index informer that deals with multiple namespaces. + // TODO: This could just create a new index informer but use a multiNamespaced list watcher. + nsToInformer := map[string]toolscache.SharedIndexInformer{} + for ns, c := range m.caches { + var err error + nsToInformer[ns], err = c.GetInformer(obj) + if err != nil { + return nil, err + } + } + + return &multiNamesapceIndexInformer{ + indexInformers: nsToInformer, + }, nil +} + +// GetInformerForKind - get an informer for an kind +func (m *multiNamespaceCache) GetInformerForKind(gvk schema.GroupVersionKind) (toolscache.SharedIndexInformer, error) { + // TODO: we need to create a way to return a shared index informer that deals with multiple namespaces. + // TODO: This could just create a new index informer but use a multiNamespaced list watcher. + nsToInformer := map[string]toolscache.SharedIndexInformer{} + for ns, c := range m.caches { + var err error + nsToInformer[ns], err = c.GetInformerForKind(gvk) + if err != nil { + return nil, err + } + } + + return &multiNamesapceIndexInformer{ + indexInformers: nsToInformer, + }, nil +} + +// Start - starts all the underlying caches +func (m *multiNamespaceCache) Start(stopCh <-chan struct{}) error { + fmt.Printf("\nstarting caches") + for namespace, c := range m.caches { + fmt.Printf("\tstarting cache: %v\n", namespace) + go c.Start(stopCh) + } + <-stopCh + return nil +} + +// WaitForCacheSync - waits for all the underlying caches to sync +func (m *multiNamespaceCache) WaitForCacheSync(stop <-chan struct{}) bool { + fmt.Printf("wait for cache to sync") + for _, c := range m.caches { + ok := c.WaitForCacheSync(stop) + if !ok { + fmt.Printf("wait for cache to sync - %v", ok) + return ok + } + } + fmt.Printf("wait for cache to sync - %v", true) + return true +} + +// IndexField - adds indexer to all of the underlying caches +func (m *multiNamespaceCache) IndexField(obj runtime.Object, field string, extractValue client.IndexerFunc) error { + for _, c := range m.caches { + err := c.IndexField(obj, field, extractValue) + if err != nil { + return err + } + } + return nil +} diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go new file mode 100644 index 00000000000..b26a3dfc3c6 --- /dev/null +++ b/pkg/manager/manager.go @@ -0,0 +1,80 @@ +package manager + +import ( + sdkcache "github.com/operator-framework/operator-sdk/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + k8smanager "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/runtime/inject" +) + +type manager struct { + k8smanager.Manager + cache cache.Cache + client client.Client +} + +// NewManager - creating manager to handle multiple namespaces +func NewManager(mgr k8smanager.Manager, namespaces []string) (k8smanager.Manager, error) { + c, err := sdkcache.NewMultiNamespaceCache(namespaces, mgr) + if err != nil { + return nil, err + } + + cl := client.DelegatingClient{ + Reader: c, + Writer: mgr.GetClient(), + } + + return &manager{ + Manager: mgr, + client: cl, + cache: c, + }, nil +} + +func (m *manager) SetFields(i interface{}) error { + + /*if err := m.Manager.SetFields(i); err != nil { + return err + }*/ + if _, err := inject.ClientInto(m.client, i); err != nil { + return err + } + if _, err := inject.CacheInto(m.cache, i); err != nil { + return err + } + if _, err := inject.InjectorInto(m.SetFields, i); err != nil { + return err + } + + return nil +} + +func (m *manager) GetCache() cache.Cache { + return m.cache +} + +func (m *manager) GetClient() client.Client { + return m.client +} + +func (m *manager) Add(r k8smanager.Runnable) error { + if err := m.Manager.Add(r); err != nil { + return err + } + if err := m.SetFields(r); err != nil { + return err + } + return nil +} + +func (m *manager) Start(stop <-chan struct{}) error { + go func() { + m.cache.Start(stop) + }() + + m.cache.WaitForCacheSync(stop) + + return m.Manager.Start(stop) +}