Skip to content

Commit

Permalink
reuse InformerFactory in scheduler tests (kubernetes#107835)
Browse files Browse the repository at this point in the history
* reuse informer in scheduler tests

Signed-off-by: kerthcet <kerthcet@gmail.com>

* reduce construct two informers

Signed-off-by: kerthcet <kerthcet@gmail.com>

* instantiate formerfacotry error

Signed-off-by: kerthcet <kerthcet@gmail.com>
  • Loading branch information
kerthcet authored Feb 10, 2022
1 parent 0dcd6ea commit 62eb70c
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 41 deletions.
23 changes: 14 additions & 9 deletions pkg/scheduler/eventhandlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,9 @@ func TestAddAllEventHandlers(t *testing.T) {
name: "default handlers in framework",
gvkMap: map[framework.GVK]framework.ActionType{},
expectStaticInformers: map[reflect.Type]bool{
reflect.TypeOf(&v1.Pod{}): true,
reflect.TypeOf(&v1.Node{}): true,
reflect.TypeOf(&v1.Pod{}): true,
reflect.TypeOf(&v1.Node{}): true,
reflect.TypeOf(&v1.Namespace{}): true,
},
expectDynamicInformers: map[schema.GroupVersionResource]bool{},
},
Expand All @@ -386,6 +387,7 @@ func TestAddAllEventHandlers(t *testing.T) {
expectStaticInformers: map[reflect.Type]bool{
reflect.TypeOf(&v1.Pod{}): true,
reflect.TypeOf(&v1.Node{}): true,
reflect.TypeOf(&v1.Namespace{}): true,
reflect.TypeOf(&v1.PersistentVolume{}): true,
reflect.TypeOf(&storagev1beta1.CSIStorageCapacity{}): true,
},
Expand All @@ -398,8 +400,9 @@ func TestAddAllEventHandlers(t *testing.T) {
"cronjobs.v1.batch": framework.Delete,
},
expectStaticInformers: map[reflect.Type]bool{
reflect.TypeOf(&v1.Pod{}): true,
reflect.TypeOf(&v1.Node{}): true,
reflect.TypeOf(&v1.Pod{}): true,
reflect.TypeOf(&v1.Node{}): true,
reflect.TypeOf(&v1.Namespace{}): true,
},
expectDynamicInformers: map[schema.GroupVersionResource]bool{
{Group: "apps", Version: "v1", Resource: "daemonsets"}: true,
Expand All @@ -413,8 +416,9 @@ func TestAddAllEventHandlers(t *testing.T) {
"custommetrics.v1beta1": framework.Update,
},
expectStaticInformers: map[reflect.Type]bool{
reflect.TypeOf(&v1.Pod{}): true,
reflect.TypeOf(&v1.Node{}): true,
reflect.TypeOf(&v1.Pod{}): true,
reflect.TypeOf(&v1.Node{}): true,
reflect.TypeOf(&v1.Namespace{}): true,
},
expectDynamicInformers: map[schema.GroupVersionResource]bool{
{Group: "apps", Version: "v1", Resource: "daemonsets"}: true,
Expand All @@ -433,13 +437,14 @@ func TestAddAllEventHandlers(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0)
schedulingQueue := queue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory)
testSched := Scheduler{
StopEverything: ctx.Done(),
SchedulingQueue: queue.NewTestQueue(ctx, nil),
SchedulingQueue: schedulingQueue,
}

client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
dynclient := dyfake.NewSimpleDynamicClient(scheme)
dynInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynclient, 0)

Expand Down
19 changes: 14 additions & 5 deletions pkg/scheduler/internal/queue/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework"
)

// NewTestQueue creates a priority queue with an empty informer factory.
func NewTestQueue(ctx context.Context, lessFn framework.LessFunc, opts ...Option) *PriorityQueue {
return NewTestQueueWithObjects(ctx, lessFn, nil, opts...)
}

// NewTestQueueWithObjects creates a priority queue with an informer factory
// populated with the provided objects.
func NewTestQueueWithObjects(
Expand All @@ -34,13 +39,17 @@ func NewTestQueueWithObjects(
opts ...Option,
) *PriorityQueue {
informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(objs...), 0)
return NewTestQueueWithInformerFactory(ctx, lessFn, informerFactory, opts...)
}

func NewTestQueueWithInformerFactory(
ctx context.Context,
lessFn framework.LessFunc,
informerFactory informers.SharedInformerFactory,
opts ...Option,
) *PriorityQueue {
pq := NewPriorityQueue(lessFn, informerFactory, opts...)
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
return pq
}

// NewTestQueue creates a priority queue with an empty informer factory.
func NewTestQueue(ctx context.Context, lessFn framework.LessFunc, opts ...Option) *PriorityQueue {
return NewTestQueueWithObjects(ctx, lessFn, nil, opts...)
}
41 changes: 14 additions & 27 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
clientsetfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
clienttesting "k8s.io/client-go/testing"
Expand Down Expand Up @@ -390,13 +391,6 @@ func TestSchedulerScheduleOne(t *testing.T) {
},
}

stop := make(chan struct{})
defer close(stop)
informerFactory := informers.NewSharedInformerFactory(client, 0)

informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)

for _, item := range table {
t.Run(item.name, func(t *testing.T) {
var gotError error
Expand Down Expand Up @@ -440,6 +434,7 @@ func TestSchedulerScheduleOne(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

s := &Scheduler{
SchedulerCache: sCache,
Algorithm: item.algo,
Expand Down Expand Up @@ -647,15 +642,13 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
pod := podWithPort("pod.Name", "", 8080)
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
scache.AddNode(&node)
client := clientsetfake.NewSimpleClientset(&node)
informerFactory := informers.NewSharedInformerFactory(client, 0)

fns := []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
st.RegisterPluginAsExtensions(nodeports.Name, nodeports.New, "Filter", "PreFilter"),
}
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(ctx, t, queuedPodStore, scache, informerFactory, pod, &node, fns...)
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(ctx, t, queuedPodStore, scache, pod, &node, fns...)

waitPodExpireChan := make(chan struct{})
timeout := make(chan struct{})
Expand Down Expand Up @@ -714,14 +707,12 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
firstPod := podWithPort("pod.Name", "", 8080)
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
scache.AddNode(&node)
client := clientsetfake.NewSimpleClientset(&node)
informerFactory := informers.NewSharedInformerFactory(client, 0)
fns := []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
st.RegisterPluginAsExtensions(nodeports.Name, nodeports.New, "Filter", "PreFilter"),
}
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(ctx, t, queuedPodStore, scache, informerFactory, firstPod, &node, fns...)
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(ctx, t, queuedPodStore, scache, firstPod, &node, fns...)

// We use conflicted pod ports to incur fit predicate failure.
secondPod := podWithPort("bar", "", 8080)
Expand Down Expand Up @@ -780,12 +771,8 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
// queuedPodStore: pods queued before processing.
// cache: scheduler cache that might contain assumed pods.
func setupTestSchedulerWithOnePodOnNode(ctx context.Context, t *testing.T, queuedPodStore *clientcache.FIFO, scache internalcache.Cache,
informerFactory informers.SharedInformerFactory, pod *v1.Pod, node *v1.Node, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {

scheduler, bindingChan, errChan := setupTestScheduler(ctx, queuedPodStore, scache, informerFactory, nil, fns...)

informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
pod *v1.Pod, node *v1.Node, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
scheduler, bindingChan, errChan := setupTestScheduler(ctx, queuedPodStore, scache, nil, nil, fns...)

queuedPodStore.Add(pod)
// queuedPodStore: [foo:8080]
Expand Down Expand Up @@ -850,8 +837,6 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
nodes = append(nodes, &node)
objects = append(objects, &node)
}
client := clientsetfake.NewSimpleClientset(objects...)
informerFactory := informers.NewSharedInformerFactory(client, 0)

// Create expected failure reasons for all the nodes. Hopefully they will get rolled up into a non-spammy summary.
failedNodeStatues := framework.NodeToStatusMap{}
Expand All @@ -867,10 +852,9 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
st.RegisterPluginAsExtensions(noderesources.Name, frameworkruntime.FactoryAdapter(feature.Features{}, noderesources.NewFit), "Filter", "PreFilter"),
}
scheduler, _, errChan := setupTestScheduler(ctx, queuedPodStore, scache, informerFactory, nil, fns...)

informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(objects...), 0)
scheduler, _, errChan := setupTestScheduler(ctx, queuedPodStore, scache, informerFactory, nil, fns...)

queuedPodStore.Add(podWithTooBigResourceRequests)
scheduler.scheduleOne(ctx)
Expand Down Expand Up @@ -916,6 +900,11 @@ func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, s
recorder = &events.FakeRecorder{}
}

if informerFactory == nil {
informerFactory = informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0)
}
schedulingQueue := internalqueue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory)

fwk, _ := st.NewFramework(
fns,
testSchedulerName,
Expand Down Expand Up @@ -945,7 +934,7 @@ func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, s
testSchedulerName: fwk,
},
client: client,
SchedulingQueue: internalqueue.NewTestQueue(ctx, nil),
SchedulingQueue: schedulingQueue,
}

return sched, bindingChan, errChan
Expand Down Expand Up @@ -975,8 +964,6 @@ func setupTestSchedulerWithVolumeBinding(ctx context.Context, volumeBinder volum
}, "PreFilter", "Filter", "Reserve", "PreBind"),
}
s, bindingChan, errChan := setupTestScheduler(ctx, queuedPodStore, scache, informerFactory, broadcaster, fns...)
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
return s, bindingChan, errChan
}

Expand Down

0 comments on commit 62eb70c

Please sign in to comment.