Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP/POC: pkg/cache;pkg/manager - Adding multi name spaced cache #785

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions pkg/cache/multi_informer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package cache

import (
"fmt"
"time"

toolscache "k8s.io/client-go/tools/cache"
)

type multiNamesapceIndexInformer struct {
indexInformers map[string]toolscache.SharedIndexInformer
}

func (m *multiNamesapceIndexInformer) AddIndexers(indexers toolscache.Indexers) error {
return nil
}

func (m *multiNamesapceIndexInformer) GetIndexer() toolscache.Indexer {
return nil
}

func (m *multiNamesapceIndexInformer) AddEventHandler(handler toolscache.ResourceEventHandler) {
for namespace, i := range m.indexInformers {
fmt.Printf("\nadding event handler: %v for namespace: %v\n", i, namespace)
i.AddEventHandler(handler)
}
}

func (m *multiNamesapceIndexInformer) AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) {
for _, i := range m.indexInformers {
i.AddEventHandlerWithResyncPeriod(handler, resyncPeriod)
}
}

func (m *multiNamesapceIndexInformer) GetStore() toolscache.Store {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hasbro17 This is the stuff that would be hard to expose. This, GetController and the LastSyncResourceVersion()

I think it may be fine to not have access to the controller or the store. People should very rarely need access to these from what I have seen. (especially the Store). Also have already set up the watches and the like so not having access to the LastSyncResourceVersion or the controller shouldn't cause issues as far as I can tell.

I think that is the biggest drawback from this approach.

return nil
}

func (m *multiNamesapceIndexInformer) GetController() toolscache.Controller {
return nil

}

func (m *multiNamesapceIndexInformer) Run(stopCh <-chan struct{}) {
for _, i := range m.indexInformers {
i.Run(stopCh)
}
}

func (m *multiNamesapceIndexInformer) HasSynced() bool {
fmt.Printf("has synced")
for _, i := range m.indexInformers {
if synced := i.HasSynced(); !synced {
fmt.Printf("has synced - %v", synced)
return synced
}
}
fmt.Printf("has synced- true")
return true
}

func (m *multiNamesapceIndexInformer) LastSyncResourceVersion() string {
return ""
}
132 changes: 132 additions & 0 deletions pkg/cache/multi_namespace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package cache

import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
toolscache "k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

// MultiNamespaceCache - allows for using multiple namespaces for the cache.
type multiNamespaceCache struct {
caches map[string]cache.Cache
}

// NewMultiNamespaceCache - creates a new cache that can handle multiple namespaces
func NewMultiNamespaceCache(namespaces []string, mgr manager.Manager) (cache.Cache, error) {
m := multiNamespaceCache{
caches: map[string]cache.Cache{},
}
for _, namespace := range namespaces {
o := cache.Options{
Scheme: mgr.GetScheme(),
Mapper: mgr.GetRESTMapper(),
Namespace: namespace,
}

cache, err := cache.New(mgr.GetConfig(), o)
if err != nil {
return nil, err
}
m.caches[namespace] = cache
}
return &m, nil
}

var _ cache.Cache = &multiNamespaceCache{}

// Get - Gets the object from the cache based on the key
func (m *multiNamespaceCache) Get(ctx context.Context, key client.ObjectKey, obj runtime.Object) error {
c, ok := m.caches[key.Namespace]
if !ok {
return fmt.Errorf("unknown namespace for cache: %v", key.Namespace)
}
return c.Get(ctx, key, obj)
}

// List - Gets the list from the cache.
func (m *multiNamespaceCache) List(ctx context.Context, opts *client.ListOptions, list runtime.Object) error {
// TODO: we may want to assume that empty namespace means seach in all namespaces I am watching.
c, ok := m.caches[opts.Namespace]
if !ok {
return fmt.Errorf("unknown namespace for cache: %v", opts.Namespace)
}
return c.List(ctx, opts, list)
}

// GetInformer - get an informer for an obj
func (m *multiNamespaceCache) GetInformer(obj runtime.Object) (toolscache.SharedIndexInformer, error) {
// TODO: we need to create a way to return a shared index informer that deals with multiple namespaces.
// TODO: This could just create a new index informer but use a multiNamespaced list watcher.
nsToInformer := map[string]toolscache.SharedIndexInformer{}
for ns, c := range m.caches {
var err error
nsToInformer[ns], err = c.GetInformer(obj)
if err != nil {
return nil, err
}
}

return &multiNamesapceIndexInformer{
indexInformers: nsToInformer,
}, nil
}

// GetInformerForKind - get an informer for an kind
func (m *multiNamespaceCache) GetInformerForKind(gvk schema.GroupVersionKind) (toolscache.SharedIndexInformer, error) {
// TODO: we need to create a way to return a shared index informer that deals with multiple namespaces.
// TODO: This could just create a new index informer but use a multiNamespaced list watcher.
nsToInformer := map[string]toolscache.SharedIndexInformer{}
for ns, c := range m.caches {
var err error
nsToInformer[ns], err = c.GetInformerForKind(gvk)
if err != nil {
return nil, err
}
}

return &multiNamesapceIndexInformer{
indexInformers: nsToInformer,
}, nil
}

// Start - starts all the underlying caches
func (m *multiNamespaceCache) Start(stopCh <-chan struct{}) error {
fmt.Printf("\nstarting caches")
for namespace, c := range m.caches {
fmt.Printf("\tstarting cache: %v\n", namespace)
go c.Start(stopCh)
}
<-stopCh
return nil
}

// WaitForCacheSync - waits for all the underlying caches to sync
func (m *multiNamespaceCache) WaitForCacheSync(stop <-chan struct{}) bool {
fmt.Printf("wait for cache to sync")
for _, c := range m.caches {
ok := c.WaitForCacheSync(stop)
if !ok {
fmt.Printf("wait for cache to sync - %v", ok)
return ok
}
}
fmt.Printf("wait for cache to sync - %v", true)
return true
}

// IndexField - adds indexer to all of the underlying caches
func (m *multiNamespaceCache) IndexField(obj runtime.Object, field string, extractValue client.IndexerFunc) error {
for _, c := range m.caches {
err := c.IndexField(obj, field, extractValue)
if err != nil {
return err
}
}
return nil
}
80 changes: 80 additions & 0 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package manager

import (
sdkcache "github.com/operator-framework/operator-sdk/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
k8smanager "sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
)

type manager struct {
k8smanager.Manager
cache cache.Cache
client client.Client
}

// NewManager - creating manager to handle multiple namespaces
func NewManager(mgr k8smanager.Manager, namespaces []string) (k8smanager.Manager, error) {
c, err := sdkcache.NewMultiNamespaceCache(namespaces, mgr)
if err != nil {
return nil, err
}

cl := client.DelegatingClient{
Reader: c,
Writer: mgr.GetClient(),
}

return &manager{
Manager: mgr,
client: cl,
cache: c,
}, nil
}

func (m *manager) SetFields(i interface{}) error {

/*if err := m.Manager.SetFields(i); err != nil {
return err
}*/
if _, err := inject.ClientInto(m.client, i); err != nil {
return err
}
if _, err := inject.CacheInto(m.cache, i); err != nil {
return err
}
if _, err := inject.InjectorInto(m.SetFields, i); err != nil {
return err
}

return nil
}

func (m *manager) GetCache() cache.Cache {
return m.cache
}

func (m *manager) GetClient() client.Client {
return m.client
}

func (m *manager) Add(r k8smanager.Runnable) error {
if err := m.Manager.Add(r); err != nil {
return err
}
if err := m.SetFields(r); err != nil {
return err
}
return nil
}

func (m *manager) Start(stop <-chan struct{}) error {
go func() {
m.cache.Start(stop)
}()

m.cache.WaitForCacheSync(stop)

return m.Manager.Start(stop)
}