Skip to content

Commit

Permalink
improvement: limits the SharedInformerFactory to the specified namespace
Browse files Browse the repository at this point in the history
  • Loading branch information
yunlzheng committed May 12, 2020
1 parent 0498db8 commit a98fc76
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 4 deletions.
39 changes: 36 additions & 3 deletions pkg/apiserver/cluster/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,22 @@ func Construct(client kubernetes.Interface, config *rest.Config) (w Watcher, err
return
}

// PodListener PodListener
func PodListener(client kubernetes.Interface, stopCh <-chan struct{}) (lister v1.PodLister, err error) {
// PodListenerWithNamespace PodListener
func PodListenerWithNamespace(client kubernetes.Interface, namespace string, stopCh <-chan struct{}) (lister v1.PodLister, err error) {
w := Watcher{Client: client}
lister, err = w.Pods(stopCh)
lister, err = w.PodsWithNamespace(namespace, stopCh)
if err != nil {
return
}
return
}

func informerFactoryWithNamespace(w *Watcher, namespace string) (factory informers.SharedInformerFactory) {
resyncPeriod := 30 * time.Minute
factory = informers.NewSharedInformerFactoryWithOptions(w.Client, resyncPeriod, informers.WithNamespace(namespace))
return
}

func informerFactory(w *Watcher) (factory informers.SharedInformerFactory) {
resyncPeriod := 30 * time.Minute
factory = informers.NewSharedInformerFactory(w.Client, resyncPeriod)
Expand Down Expand Up @@ -125,6 +131,33 @@ func podDeleted(obj interface{}) {
// }
}

// PodsWithNamespace watch pods change
func (w *Watcher) PodsWithNamespace(namespace string, stopCh <-chan struct{}) (lister v1.PodLister, err error) {
factory := informerFactoryWithNamespace(w, namespace)
podInformer := factory.Core().V1().Pods()
informer := podInformer.Informer()

defer runtime.HandleCrash()

factory.Start(stopCh)

if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
err = errTimeout
runtime.HandleError(err)
return
}

informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
// AddFunc: podCreated,
DeleteFunc: podDeleted,
},
)

lister = podInformer.Lister()
return
}

// Pods watch pods change
func (w *Watcher) Pods(stopCh <-chan struct{}) (lister v1.PodLister, err error) {
factory := informerFactory(w)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kt/cluster/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (k *Kubernetes) ServiceHosts(namespace string) (hosts map[string]string) {
func waitPodReadyUsingInformer(namespace, name string, clientset kubernetes.Interface) (pod v1.Pod, err error) {
stopSignal := make(chan struct{})
defer close(stopSignal)
podListener, err := clusterWatcher.PodListener(clientset, stopSignal)
podListener, err := clusterWatcher.PodListenerWithNamespace(clientset, namespace, stopSignal)
if err != nil {
return
}
Expand Down

0 comments on commit a98fc76

Please sign in to comment.