Skip to content

Commit

Permalink
Merge pull request #106816 from Huang-Wei/fix-nnn-not-cleared
Browse files Browse the repository at this point in the history
clear pod's .status.nominatedNodeName when necessary
  • Loading branch information
k8s-ci-robot authored Dec 17, 2021
2 parents 3623121 + 2433b08 commit 712745c
Show file tree
Hide file tree
Showing 12 changed files with 340 additions and 202 deletions.
32 changes: 30 additions & 2 deletions pkg/scheduler/framework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,16 +608,44 @@ type Handle interface {
Parallelizer() parallelize.Parallelizer
}

type NominatingMode int

const (
ModeNoop NominatingMode = iota
ModeOverride
)

type NominatingInfo struct {
NominatedNodeName string
NominatingMode NominatingMode
}

// PostFilterResult wraps needed info for scheduler framework to act upon PostFilter phase.
type PostFilterResult struct {
NominatedNodeName string
*NominatingInfo
}

func NewPostFilterResultWithNominatedNode(name string) *PostFilterResult {
return &PostFilterResult{
NominatingInfo: &NominatingInfo{
NominatedNodeName: name,
NominatingMode: ModeOverride,
},
}
}

func (ni *NominatingInfo) Mode() NominatingMode {
if ni == nil {
return ModeNoop
}
return ni.NominatingMode
}

// PodNominator abstracts operations to maintain nominated Pods.
type PodNominator interface {
// AddNominatedPod adds the given pod to the nominator or
// updates it if it already exists.
AddNominatedPod(pod *PodInfo, nodeName string)
AddNominatedPod(pod *PodInfo, nominatingInfo *NominatingInfo)
// DeleteNominatedPodIfExists deletes nominatedPod from internal cache. It's a no-op if it doesn't exist.
DeleteNominatedPodIfExists(pod *v1.Pod)
// UpdateNominatedPod updates the <oldPod> with <newPod>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestPostFilter(t *testing.T) {
filteredNodesStatuses: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.Unschedulable),
},
wantResult: &framework.PostFilterResult{NominatedNodeName: "node1"},
wantResult: framework.NewPostFilterResultWithNominatedNode("node1"),
wantStatus: framework.NewStatus(framework.Success),
},
{
Expand All @@ -176,7 +176,7 @@ func TestPostFilter(t *testing.T) {
filteredNodesStatuses: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.Unschedulable),
},
wantResult: nil,
wantResult: framework.NewPostFilterResultWithNominatedNode(""),
wantStatus: framework.NewStatus(framework.Unschedulable, "0/1 nodes are available: 1 No victims found on node node1 for preemptor pod p."),
},
{
Expand All @@ -191,7 +191,7 @@ func TestPostFilter(t *testing.T) {
filteredNodesStatuses: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable),
},
wantResult: nil,
wantResult: framework.NewPostFilterResultWithNominatedNode(""),
wantStatus: framework.NewStatus(framework.Unschedulable, "0/1 nodes are available: 1 Preemption is not helpful for scheduling."),
},
{
Expand All @@ -209,7 +209,7 @@ func TestPostFilter(t *testing.T) {
"node1": framework.NewStatus(framework.Unschedulable),
"node2": framework.NewStatus(framework.Unschedulable),
},
wantResult: &framework.PostFilterResult{NominatedNodeName: "node2"},
wantResult: framework.NewPostFilterResultWithNominatedNode("node2"),
wantStatus: framework.NewStatus(framework.Success),
},
{
Expand All @@ -227,10 +227,8 @@ func TestPostFilter(t *testing.T) {
"node1": framework.NewStatus(framework.Unschedulable),
"node2": framework.NewStatus(framework.Unschedulable),
},
extender: &st.FakeExtender{Predicates: []st.FitPredicate{st.Node1PredicateExtender}},
wantResult: &framework.PostFilterResult{
NominatedNodeName: "node1",
},
extender: &st.FakeExtender{Predicates: []st.FitPredicate{st.Node1PredicateExtender}},
wantResult: framework.NewPostFilterResultWithNominatedNode("node1"),
wantStatus: framework.NewStatus(framework.Success),
},
{
Expand All @@ -248,7 +246,7 @@ func TestPostFilter(t *testing.T) {
"node1": framework.NewStatus(framework.Unschedulable),
"node2": framework.NewStatus(framework.Unschedulable),
},
wantResult: nil,
wantResult: framework.NewPostFilterResultWithNominatedNode(""),
wantStatus: framework.NewStatus(framework.Unschedulable, "0/2 nodes are available: 2 Insufficient cpu."),
},
{
Expand All @@ -266,7 +264,7 @@ func TestPostFilter(t *testing.T) {
"node1": framework.NewStatus(framework.Unschedulable),
"node2": framework.NewStatus(framework.Unschedulable),
},
wantResult: nil,
wantResult: framework.NewPostFilterResultWithNominatedNode(""),
wantStatus: framework.NewStatus(framework.Unschedulable, "0/2 nodes are available: 1 Insufficient cpu, 1 No victims found on node node1 for preemptor pod p."),
},
{
Expand All @@ -286,7 +284,7 @@ func TestPostFilter(t *testing.T) {
"node3": framework.NewStatus(framework.UnschedulableAndUnresolvable),
"node4": framework.NewStatus(framework.UnschedulableAndUnresolvable),
},
wantResult: nil,
wantResult: framework.NewPostFilterResultWithNominatedNode(""),
wantStatus: framework.NewStatus(framework.Unschedulable, "0/4 nodes are available: 2 Insufficient cpu, 2 Preemption is not helpful for scheduling."),
},
{
Expand Down Expand Up @@ -317,9 +315,7 @@ func TestPostFilter(t *testing.T) {
"node1": framework.NewStatus(framework.Unschedulable),
"node2": framework.NewStatus(framework.Unschedulable),
},
wantResult: &framework.PostFilterResult{
NominatedNodeName: "node2",
},
wantResult: framework.NewPostFilterResultWithNominatedNode("node2"),
wantStatus: framework.NewStatus(framework.Success),
},
}
Expand Down Expand Up @@ -1465,7 +1461,7 @@ func TestPreempt(t *testing.T) {
extenders []*st.FakeExtender
nodeNames []string
registerPlugin st.RegisterPluginFunc
expectedNode string
want *framework.PostFilterResult
expectedPods []string // list of preempted pods
}{
{
Expand All @@ -1479,7 +1475,7 @@ func TestPreempt(t *testing.T) {
},
nodeNames: []string{"node1", "node2", "node3"},
registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"),
expectedNode: "node1",
want: framework.NewPostFilterResultWithNominatedNode("node1"),
expectedPods: []string{"p1.1", "p1.2"},
},
{
Expand All @@ -1497,7 +1493,7 @@ func TestPreempt(t *testing.T) {
},
nodeNames: []string{"node-a/zone1", "node-b/zone1", "node-x/zone2"},
registerPlugin: st.RegisterPluginAsExtensions(podtopologyspread.Name, podtopologyspread.New, "PreFilter", "Filter"),
expectedNode: "node-b",
want: framework.NewPostFilterResultWithNominatedNode("node-b"),
expectedPods: []string{"p-b1"},
},
{
Expand All @@ -1514,7 +1510,7 @@ func TestPreempt(t *testing.T) {
{Predicates: []st.FitPredicate{st.Node1PredicateExtender}},
},
registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"),
expectedNode: "node1",
want: framework.NewPostFilterResultWithNominatedNode("node1"),
expectedPods: []string{"p1.1", "p1.2"},
},
{
Expand All @@ -1530,7 +1526,7 @@ func TestPreempt(t *testing.T) {
{Predicates: []st.FitPredicate{st.FalsePredicateExtender}},
},
registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"),
expectedNode: "",
want: nil,
expectedPods: []string{},
},
{
Expand All @@ -1547,7 +1543,7 @@ func TestPreempt(t *testing.T) {
{Predicates: []st.FitPredicate{st.Node1PredicateExtender}},
},
registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"),
expectedNode: "node1",
want: framework.NewPostFilterResultWithNominatedNode("node1"),
expectedPods: []string{"p1.1", "p1.2"},
},
{
Expand All @@ -1564,8 +1560,8 @@ func TestPreempt(t *testing.T) {
{Predicates: []st.FitPredicate{st.TruePredicateExtender}},
},
registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"),
//sum of priorities of all victims on node1 is larger than node2, node2 is chosen.
expectedNode: "node2",
// sum of priorities of all victims on node1 is larger than node2, node2 is chosen.
want: framework.NewPostFilterResultWithNominatedNode("node2"),
expectedPods: []string{"p2.1"},
},
{
Expand All @@ -1579,7 +1575,7 @@ func TestPreempt(t *testing.T) {
},
nodeNames: []string{"node1", "node2", "node3"},
registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"),
expectedNode: "",
want: nil,
expectedPods: nil,
},
{
Expand All @@ -1593,7 +1589,7 @@ func TestPreempt(t *testing.T) {
},
nodeNames: []string{"node1", "node2", "node3"},
registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"),
expectedNode: "node1",
want: framework.NewPostFilterResultWithNominatedNode("node1"),
expectedPods: []string{"p1.1", "p1.2"},
},
}
Expand Down Expand Up @@ -1691,11 +1687,8 @@ func TestPreempt(t *testing.T) {
if !status.IsSuccess() && !status.IsUnschedulable() {
t.Errorf("unexpected error in preemption: %v", status.AsError())
}
if res != nil && len(res.NominatedNodeName) != 0 && res.NominatedNodeName != test.expectedNode {
t.Errorf("expected node: %v, got: %v", test.expectedNode, res.NominatedNodeName)
}
if res != nil && len(res.NominatedNodeName) == 0 && len(test.expectedNode) != 0 {
t.Errorf("expected node: %v, got: nothing", test.expectedNode)
if diff := cmp.Diff(test.want, res); diff != "" {
t.Errorf("Unexpected status (-want, +got):\n%s", diff)
}
if len(deletedPodNames) != len(test.expectedPods) {
t.Errorf("expected %v pods, got %v.", len(test.expectedPods), len(deletedPodNames))
Expand All @@ -1712,7 +1705,7 @@ func TestPreempt(t *testing.T) {
t.Errorf("pod %v is not expected to be a victim.", victimName)
}
}
if res != nil {
if res != nil && res.NominatingInfo != nil {
test.pod.Status.NominatedNodeName = res.NominatedNodeName
}

Expand All @@ -1730,7 +1723,7 @@ func TestPreempt(t *testing.T) {
if !status.IsSuccess() && !status.IsUnschedulable() {
t.Errorf("unexpected error in preemption: %v", status.AsError())
}
if res != nil && len(deletedPodNames) > 0 {
if res != nil && res.NominatingInfo != nil && len(deletedPodNames) > 0 {
t.Errorf("didn't expect any more preemption. Node %v is selected for preemption.", res.NominatedNodeName)
}
})
Expand Down
18 changes: 15 additions & 3 deletions pkg/scheduler/framework/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,19 @@ type Evaluator struct {
Interface
}

// Preempt returns a PostFilterResult carrying suggested nominatedNodeName, along with a Status.
// The semantics of returned <PostFilterResult, Status> varies on different scenarios:
// - <nil, Error>. This denotes it's a transient/rare error that may be self-healed in future cycles.
// - <nil, Unschedulable>. This status is mostly as expected like the preemptor is waiting for the
// victims to be fully terminated.
// - In both cases above, a nil PostFilterResult is returned to keep the pod's nominatedNodeName unchanged.
//
// - <non-nil PostFilterResult, Unschedulable>. It indicates the pod cannot be scheduled even with preemption.
// In this case, a non-nil PostFilterResult is returned and result.NominatingMode instructs how to deal with
// the nominatedNodeName.
// - <non-nil PostFilterResult}, Success>. It's the regular happy path
// and the non-empty nominatedNodeName will be applied to the preemptor pod.
func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {

// 0) Fetch the latest version of <pod>.
// It's safe to directly fetch pod here. Because the informer cache has already been
// initialized when creating the Scheduler obj, i.e., factory.go#MakeDefaultErrorFunc().
Expand Down Expand Up @@ -158,7 +169,8 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT
// Leave FailedPlugins as nil as it won't be used on moving Pods.
},
}
return nil, framework.NewStatus(framework.Unschedulable, fitError.Error())
// Specify nominatedNodeName to clear the pod's nominatedNodeName status, if applicable.
return framework.NewPostFilterResultWithNominatedNode(""), framework.NewStatus(framework.Unschedulable, fitError.Error())
}

// 3) Interact with registered Extenders to filter out some candidates if needed.
Expand All @@ -178,7 +190,7 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT
return nil, status
}

return &framework.PostFilterResult{NominatedNodeName: bestCandidate.Name()}, framework.NewStatus(framework.Success)
return framework.NewPostFilterResultWithNominatedNode(bestCandidate.Name()), framework.NewStatus(framework.Success)
}

// FindCandidates calculates a slice of preemption candidates.
Expand Down
6 changes: 5 additions & 1 deletion pkg/scheduler/framework/runtime/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,18 +711,22 @@ func (f *frameworkImpl) RunPostFilterPlugins(ctx context.Context, state *framewo
}()

statuses := make(framework.PluginToStatus)
// `result` records the last meaningful(non-noop) PostFilterResult.
var result *framework.PostFilterResult
for _, pl := range f.postFilterPlugins {
r, s := f.runPostFilterPlugin(ctx, pl, state, pod, filteredNodeStatusMap)
if s.IsSuccess() {
return r, s
} else if !s.IsUnschedulable() {
// Any status other than Success or Unschedulable is Error.
return nil, framework.AsStatus(s.AsError())
} else if r != nil && r.Mode() != framework.ModeNoop {
result = r
}
statuses[pl.Name()] = s
}

return nil, statuses.Merge()
return result, statuses.Merge()
}

func (f *frameworkImpl) runPostFilterPlugin(ctx context.Context, pl framework.PostFilterPlugin, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
Expand Down
6 changes: 4 additions & 2 deletions pkg/scheduler/framework/runtime/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ func TestRunScorePlugins(t *testing.T) {
},
{
name: "single ScoreWithNormalize plugin",
//registry: registry,
// registry: registry,
plugins: buildScoreConfigDefaultWeights(scoreWithNormalizePlugin1),
pluginConfigs: []config.PluginConfig{
{
Expand Down Expand Up @@ -1596,7 +1596,9 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) {

podNominator := internalqueue.NewPodNominator(nil)
if tt.nominatedPod != nil {
podNominator.AddNominatedPod(framework.NewPodInfo(tt.nominatedPod), nodeName)
podNominator.AddNominatedPod(
framework.NewPodInfo(tt.nominatedPod),
&framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: nodeName})
}
profile := config.KubeSchedulerProfile{Plugins: cfgPls}
f, err := newFrameworkWithQueueSortAndBind(registry, profile, WithPodNominator(podNominator))
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/generic_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,8 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
if err := scheduler.cache.UpdateSnapshot(scheduler.nodeInfoSnapshot); err != nil {
t.Fatal(err)
}
fwk.AddNominatedPod(framework.NewPodInfo(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}), "1")
fwk.AddNominatedPod(framework.NewPodInfo(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}),
&framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "1"})

_, _, err = scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), test.pod)

Expand Down
Loading

0 comments on commit 712745c

Please sign in to comment.