diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index cf3887dd22f60..430a2d8b3267b 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -131,7 +131,7 @@ func (c *Configurator) create() (*Scheduler, error) { } // The nominator will be passed all the way to framework instantiation. - nominator := internalqueue.NewPodNominator() + nominator := internalqueue.NewSafePodNominator(c.informerFactory.Core().V1().Pods().Lister()) profiles, err := profile.NewMap(c.profiles, c.registry, c.recorderFactory, frameworkruntime.WithClientSet(c.client), frameworkruntime.WithInformerFactory(c.informerFactory), diff --git a/pkg/scheduler/internal/queue/BUILD b/pkg/scheduler/internal/queue/BUILD index 1c4a50ef227fa..844be6162fd02 100644 --- a/pkg/scheduler/internal/queue/BUILD +++ b/pkg/scheduler/internal/queue/BUILD @@ -17,6 +17,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", ], @@ -36,7 +37,11 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//staging/src/k8s.io/client-go/informers:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/component-base/metrics/testutil:go_default_library", + "//vendor/github.com/google/go-cmp/cmp:go_default_library", + "//vendor/github.com/google/go-cmp/cmp/cmpopts:go_default_library", ], ) diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 56504cf77b57a..e0384b19aad46 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -29,6 +29,7 @@ import ( "sync" "time" + listersv1 "k8s.io/client-go/listers/core/v1" "k8s.io/klog/v2" v1 "k8s.io/api/core/v1" @@ -721,6 +722,8 @@ func newUnschedulablePodsMap(metricRecorder metrics.MetricRecorder) *Unschedulab // may be different than what scheduler has here. We should be able to find pods // by their UID and update/delete them. type nominatedPodMap struct { + // podLister is used to verify if the given pod is alive. + podLister listersv1.PodLister // nominatedPods is a map keyed by a node name and the value is a list of // pods which are nominated to run on the node. These are pods which can be in // the activeQ or unschedulableQ. @@ -744,6 +747,15 @@ func (npm *nominatedPodMap) add(p *v1.Pod, nodeName string) { return } } + + if npm.podLister != nil { + // If the pod is not alive, don't contain it. + if _, err := npm.podLister.Pods(p.Namespace).Get(p.Name); err != nil { + klog.V(4).InfoS("Pod %v/%v doesn't exist in podLister, aborting adding it to the nominated map", p.Namespace, p.Name) + return + } + } + npm.nominatedPodToNode[p.UID] = nnn for _, np := range npm.nominatedPods[nnn] { if np.UID == p.UID { @@ -796,8 +808,17 @@ func (npm *nominatedPodMap) UpdateNominatedPod(oldPod, newPod *v1.Pod) { } // NewPodNominator creates a nominatedPodMap as a backing of framework.PodNominator. +// DEPRECATED: use NewSafePodNominator() instead. func NewPodNominator() framework.PodNominator { + return NewSafePodNominator(nil) +} + +// NewSafePodNominator creates a nominatedPodMap as a backing of framework.PodNominator. +// Unlike NewPodNominator, it passes in a podLister so as to check if the pod is alive +// before adding its nominatedNode info. +func NewSafePodNominator(podLister listersv1.PodLister) framework.PodNominator { return &nominatedPodMap{ + podLister: podLister, nominatedPods: make(map[string][]*v1.Pod), nominatedPodToNode: make(map[ktypes.UID]string), } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 476d3b5a983c9..6018da932e47e 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -17,6 +17,7 @@ limitations under the License. package queue import ( + "context" "fmt" "reflect" "strings" @@ -24,10 +25,15 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" "k8s.io/component-base/metrics/testutil" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -134,8 +140,8 @@ func TestPriorityQueue_Add(t *testing.T) { "node1": {&medPriorityPod, &unschedulablePod}, }, } - if !reflect.DeepEqual(q.PodNominator, expectedNominatedPods) { - t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.PodNominator) + if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" { + t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff) } if p, err := q.Pop(); err != nil || p.Pod != &highPriorityPod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name) @@ -186,8 +192,8 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { "node1": {&highPriNominatedPod, &unschedulablePod}, }, } - if !reflect.DeepEqual(q.PodNominator, expectedNominatedPods) { - t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.PodNominator) + if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" { + t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff) } if p, err := q.Pop(); err != nil || p.Pod != &highPriNominatedPod { t.Errorf("Expected: %v after Pop, but got: %v", highPriNominatedPod.Name, p.Pod.Name) @@ -473,6 +479,57 @@ func TestPriorityQueue_NominatedPodsForNode(t *testing.T) { } } +func TestPriorityQueue_NominatedPodDeleted(t *testing.T) { + tests := []struct { + name string + pod *v1.Pod + deletePod bool + want bool + }{ + { + name: "alive pod gets added into PodNominator", + pod: &medPriorityPod, + want: true, + }, + { + name: "deleted pod shouldn't be added into PodNominator", + pod: &highPriNominatedPod, + deletePod: true, + want: false, + }, + { + name: "pod without .status.nominatedPodName specified shouldn't be added into PodNominator", + pod: &highPriorityPod, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cs := fake.NewSimpleClientset(tt.pod) + informerFactory := informers.NewSharedInformerFactory(cs, 0) + podLister := informerFactory.Core().V1().Pods().Lister() + + // Build a PriorityQueue. + q := NewPriorityQueue(newDefaultQueueSort(), WithPodNominator(NewSafePodNominator(podLister))) + ctx := context.Background() + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + + if tt.deletePod { + // Simulate that the test pod gets deleted physically. + informerFactory.Core().V1().Pods().Informer().GetStore().Delete(tt.pod) + } + + q.AddNominatedPod(tt.pod, tt.pod.Status.NominatedNodeName) + + if got := len(q.NominatedPodsForNode(tt.pod.Status.NominatedNodeName)) == 1; got != tt.want { + t.Errorf("Want %v, but got %v", tt.want, got) + } + }) + } +} + func TestPriorityQueue_PendingPods(t *testing.T) { makeSet := func(pods []*v1.Pod) map[*v1.Pod]struct{} { pendingSet := map[*v1.Pod]struct{}{} @@ -520,15 +577,15 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { "node5": {&unschedulablePod}, }, } - if !reflect.DeepEqual(q.PodNominator, expectedNominatedPods) { - t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.PodNominator) + if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" { + t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff) } if p, err := q.Pop(); err != nil || p.Pod != &medPriorityPod { t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Pod.Name) } // List of nominated pods shouldn't change after popping them from the queue. - if !reflect.DeepEqual(q.PodNominator, expectedNominatedPods) { - t.Errorf("Unexpected nominated map after popping pods. Expected: %v, got: %v", expectedNominatedPods, q.PodNominator) + if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" { + t.Errorf("Unexpected diff after popping pods (-want, +got):\n%s", diff) } // Update one of the nominated pods that doesn't have nominatedNodeName in the // pod object. It should be updated correctly. @@ -545,8 +602,8 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { "node5": {&unschedulablePod}, }, } - if !reflect.DeepEqual(q.PodNominator, expectedNominatedPods) { - t.Errorf("Unexpected nominated map after updating pods. Expected: %v, got: %v", expectedNominatedPods, q.PodNominator) + if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" { + t.Errorf("Unexpected diff after updating pods (-want, +got):\n%s", diff) } // Delete a nominated pod that doesn't have nominatedNodeName in the pod @@ -562,8 +619,8 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { "node5": {&unschedulablePod}, }, } - if !reflect.DeepEqual(q.PodNominator, expectedNominatedPods) { - t.Errorf("Unexpected nominated map after deleting pods. Expected: %v, got: %v", expectedNominatedPods, q.PodNominator) + if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" { + t.Errorf("Unexpected diff after deleting pods (-want, +got):\n%s", diff) } }