Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clear pod's .status.nominatedNodeName when necessary #106816

Merged
merged 1 commit into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions pkg/scheduler/framework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,16 +608,44 @@ type Handle interface {
Parallelizer() parallelize.Parallelizer
}

type NominatingMode int

const (
ModeNoop NominatingMode = iota
ModeOverride
)

type NominatingInfo struct {
NominatedNodeName string
NominatingMode NominatingMode
}

// PostFilterResult wraps needed info for scheduler framework to act upon PostFilter phase.
type PostFilterResult struct {
NominatedNodeName string
*NominatingInfo
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it need to be a pointer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not mandatory. But it can simplify the code on caller side. (just pass in a nil to indicate its mode is Noop)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's good when passing NominatingInfo to a function.

But when returning a Noop result, we are just using PostFilterResult=nil, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But when returning a Noop result, we are just using PostFilterResult=nil, no?

A Noop result equals a nil NominatingInfo, but not quite equivalent to PostFilterResult as NominatingInfo is a sub-field of PostFilterResult.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I'm saying is that you are already using PostFilterResult=nil to signal Noop, so I don't see much point on having a pointer here. However, it's such a minor detail that I don't feel strongly about it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Given a pointer NominatingInfo can be carried over directly to other places (like recordSchedulingFailure()) that need a pointer, I'm inclined to keep it as-is.

}

func NewPostFilterResultWithNominatedNode(name string) *PostFilterResult {
return &PostFilterResult{
NominatingInfo: &NominatingInfo{
NominatedNodeName: name,
NominatingMode: ModeOverride,
},
}
}

func (ni *NominatingInfo) Mode() NominatingMode {
if ni == nil {
return ModeNoop
}
return ni.NominatingMode
}

// PodNominator abstracts operations to maintain nominated Pods.
type PodNominator interface {
// AddNominatedPod adds the given pod to the nominator or
// updates it if it already exists.
AddNominatedPod(pod *PodInfo, nodeName string)
AddNominatedPod(pod *PodInfo, nominatingInfo *NominatingInfo)
// DeleteNominatedPodIfExists deletes nominatedPod from internal cache. It's a no-op if it doesn't exist.
DeleteNominatedPodIfExists(pod *v1.Pod)
// UpdateNominatedPod updates the <oldPod> with <newPod>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestPostFilter(t *testing.T) {
filteredNodesStatuses: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.Unschedulable),
},
wantResult: &framework.PostFilterResult{NominatedNodeName: "node1"},
wantResult: framework.NewPostFilterResultWithNominatedNode("node1"),
wantStatus: framework.NewStatus(framework.Success),
},
{
Expand All @@ -176,7 +176,7 @@ func TestPostFilter(t *testing.T) {
filteredNodesStatuses: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.Unschedulable),
},
wantResult: nil,
wantResult: framework.NewPostFilterResultWithNominatedNode(""),
wantStatus: framework.NewStatus(framework.Unschedulable, "0/1 nodes are available: 1 No victims found on node node1 for preemptor pod p."),
},
{
Expand All @@ -191,7 +191,7 @@ func TestPostFilter(t *testing.T) {
filteredNodesStatuses: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable),
},
wantResult: nil,
wantResult: framework.NewPostFilterResultWithNominatedNode(""),
wantStatus: framework.NewStatus(framework.Unschedulable, "0/1 nodes are available: 1 Preemption is not helpful for scheduling."),
},
{
Expand All @@ -209,7 +209,7 @@ func TestPostFilter(t *testing.T) {
"node1": framework.NewStatus(framework.Unschedulable),
"node2": framework.NewStatus(framework.Unschedulable),
},
wantResult: &framework.PostFilterResult{NominatedNodeName: "node2"},
wantResult: framework.NewPostFilterResultWithNominatedNode("node2"),
wantStatus: framework.NewStatus(framework.Success),
},
{
Expand All @@ -227,10 +227,8 @@ func TestPostFilter(t *testing.T) {
"node1": framework.NewStatus(framework.Unschedulable),
"node2": framework.NewStatus(framework.Unschedulable),
},
extender: &st.FakeExtender{Predicates: []st.FitPredicate{st.Node1PredicateExtender}},
wantResult: &framework.PostFilterResult{
NominatedNodeName: "node1",
},
extender: &st.FakeExtender{Predicates: []st.FitPredicate{st.Node1PredicateExtender}},
wantResult: framework.NewPostFilterResultWithNominatedNode("node1"),
wantStatus: framework.NewStatus(framework.Success),
},
{
Expand All @@ -248,7 +246,7 @@ func TestPostFilter(t *testing.T) {
"node1": framework.NewStatus(framework.Unschedulable),
"node2": framework.NewStatus(framework.Unschedulable),
},
wantResult: nil,
wantResult: framework.NewPostFilterResultWithNominatedNode(""),
wantStatus: framework.NewStatus(framework.Unschedulable, "0/2 nodes are available: 2 Insufficient cpu."),
},
{
Expand All @@ -266,7 +264,7 @@ func TestPostFilter(t *testing.T) {
"node1": framework.NewStatus(framework.Unschedulable),
"node2": framework.NewStatus(framework.Unschedulable),
},
wantResult: nil,
wantResult: framework.NewPostFilterResultWithNominatedNode(""),
wantStatus: framework.NewStatus(framework.Unschedulable, "0/2 nodes are available: 1 Insufficient cpu, 1 No victims found on node node1 for preemptor pod p."),
},
{
Expand All @@ -286,7 +284,7 @@ func TestPostFilter(t *testing.T) {
"node3": framework.NewStatus(framework.UnschedulableAndUnresolvable),
"node4": framework.NewStatus(framework.UnschedulableAndUnresolvable),
},
wantResult: nil,
wantResult: framework.NewPostFilterResultWithNominatedNode(""),
wantStatus: framework.NewStatus(framework.Unschedulable, "0/4 nodes are available: 2 Insufficient cpu, 2 Preemption is not helpful for scheduling."),
},
{
Expand Down Expand Up @@ -317,9 +315,7 @@ func TestPostFilter(t *testing.T) {
"node1": framework.NewStatus(framework.Unschedulable),
"node2": framework.NewStatus(framework.Unschedulable),
},
wantResult: &framework.PostFilterResult{
NominatedNodeName: "node2",
},
wantResult: framework.NewPostFilterResultWithNominatedNode("node2"),
wantStatus: framework.NewStatus(framework.Success),
},
}
Expand Down Expand Up @@ -1465,7 +1461,7 @@ func TestPreempt(t *testing.T) {
extenders []*st.FakeExtender
nodeNames []string
registerPlugin st.RegisterPluginFunc
expectedNode string
want *framework.PostFilterResult
expectedPods []string // list of preempted pods
}{
{
Expand All @@ -1479,7 +1475,7 @@ func TestPreempt(t *testing.T) {
},
nodeNames: []string{"node1", "node2", "node3"},
registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"),
expectedNode: "node1",
want: framework.NewPostFilterResultWithNominatedNode("node1"),
expectedPods: []string{"p1.1", "p1.2"},
},
{
Expand All @@ -1497,7 +1493,7 @@ func TestPreempt(t *testing.T) {
},
nodeNames: []string{"node-a/zone1", "node-b/zone1", "node-x/zone2"},
registerPlugin: st.RegisterPluginAsExtensions(podtopologyspread.Name, podtopologyspread.New, "PreFilter", "Filter"),
expectedNode: "node-b",
want: framework.NewPostFilterResultWithNominatedNode("node-b"),
expectedPods: []string{"p-b1"},
},
{
Expand All @@ -1514,7 +1510,7 @@ func TestPreempt(t *testing.T) {
{Predicates: []st.FitPredicate{st.Node1PredicateExtender}},
},
registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"),
expectedNode: "node1",
want: framework.NewPostFilterResultWithNominatedNode("node1"),
expectedPods: []string{"p1.1", "p1.2"},
},
{
Expand All @@ -1530,7 +1526,7 @@ func TestPreempt(t *testing.T) {
{Predicates: []st.FitPredicate{st.FalsePredicateExtender}},
},
registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"),
expectedNode: "",
want: nil,
expectedPods: []string{},
},
{
Expand All @@ -1547,7 +1543,7 @@ func TestPreempt(t *testing.T) {
{Predicates: []st.FitPredicate{st.Node1PredicateExtender}},
},
registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"),
expectedNode: "node1",
want: framework.NewPostFilterResultWithNominatedNode("node1"),
expectedPods: []string{"p1.1", "p1.2"},
},
{
Expand All @@ -1564,8 +1560,8 @@ func TestPreempt(t *testing.T) {
{Predicates: []st.FitPredicate{st.TruePredicateExtender}},
},
registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"),
//sum of priorities of all victims on node1 is larger than node2, node2 is chosen.
expectedNode: "node2",
// sum of priorities of all victims on node1 is larger than node2, node2 is chosen.
want: framework.NewPostFilterResultWithNominatedNode("node2"),
expectedPods: []string{"p2.1"},
},
{
Expand All @@ -1579,7 +1575,7 @@ func TestPreempt(t *testing.T) {
},
nodeNames: []string{"node1", "node2", "node3"},
registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"),
expectedNode: "",
want: nil,
expectedPods: nil,
},
{
Expand All @@ -1593,7 +1589,7 @@ func TestPreempt(t *testing.T) {
},
nodeNames: []string{"node1", "node2", "node3"},
registerPlugin: st.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"),
expectedNode: "node1",
want: framework.NewPostFilterResultWithNominatedNode("node1"),
expectedPods: []string{"p1.1", "p1.2"},
},
}
Expand Down Expand Up @@ -1691,11 +1687,8 @@ func TestPreempt(t *testing.T) {
if !status.IsSuccess() && !status.IsUnschedulable() {
t.Errorf("unexpected error in preemption: %v", status.AsError())
}
if res != nil && len(res.NominatedNodeName) != 0 && res.NominatedNodeName != test.expectedNode {
t.Errorf("expected node: %v, got: %v", test.expectedNode, res.NominatedNodeName)
}
if res != nil && len(res.NominatedNodeName) == 0 && len(test.expectedNode) != 0 {
t.Errorf("expected node: %v, got: nothing", test.expectedNode)
if diff := cmp.Diff(test.want, res); diff != "" {
t.Errorf("Unexpected status (-want, +got):\n%s", diff)
}
if len(deletedPodNames) != len(test.expectedPods) {
t.Errorf("expected %v pods, got %v.", len(test.expectedPods), len(deletedPodNames))
Expand All @@ -1712,7 +1705,7 @@ func TestPreempt(t *testing.T) {
t.Errorf("pod %v is not expected to be a victim.", victimName)
}
}
if res != nil {
if res != nil && res.NominatingInfo != nil {
test.pod.Status.NominatedNodeName = res.NominatedNodeName
}

Expand All @@ -1730,7 +1723,7 @@ func TestPreempt(t *testing.T) {
if !status.IsSuccess() && !status.IsUnschedulable() {
t.Errorf("unexpected error in preemption: %v", status.AsError())
}
if res != nil && len(deletedPodNames) > 0 {
if res != nil && res.NominatingInfo != nil && len(deletedPodNames) > 0 {
t.Errorf("didn't expect any more preemption. Node %v is selected for preemption.", res.NominatedNodeName)
}
})
Expand Down
18 changes: 15 additions & 3 deletions pkg/scheduler/framework/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,19 @@ type Evaluator struct {
Interface
}

// Preempt returns a PostFilterResult carrying suggested nominatedNodeName, along with a Status.
// The semantics of returned <PostFilterResult, Status> varies on different scenarios:
// - <nil, Error>. This denotes it's a transient/rare error that may be self-healed in future cycles.
// - <nil, Unschedulable>. This status is mostly as expected like the preemptor is waiting for the
// victims to be fully terminated.
// - In both cases above, a nil PostFilterResult is returned to keep the pod's nominatedNodeName unchanged.
//
// - <non-nil PostFilterResult, Unschedulable>. It indicates the pod cannot be scheduled even with preemption.
// In this case, a non-nil PostFilterResult is returned and result.NominatingMode instructs how to deal with
// the nominatedNodeName.
// - <non-nil PostFilterResult}, Success>. It's the regular happy path
// and the non-empty nominatedNodeName will be applied to the preemptor pod.
func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {

// 0) Fetch the latest version of <pod>.
// It's safe to directly fetch pod here. Because the informer cache has already been
// initialized when creating the Scheduler obj, i.e., factory.go#MakeDefaultErrorFunc().
Expand Down Expand Up @@ -158,7 +169,8 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT
// Leave FailedPlugins as nil as it won't be used on moving Pods.
},
}
return nil, framework.NewStatus(framework.Unschedulable, fitError.Error())
// Specify nominatedNodeName to clear the pod's nominatedNodeName status, if applicable.
return framework.NewPostFilterResultWithNominatedNode(""), framework.NewStatus(framework.Unschedulable, fitError.Error())
}

// 3) Interact with registered Extenders to filter out some candidates if needed.
Expand All @@ -178,7 +190,7 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT
return nil, status
}

return &framework.PostFilterResult{NominatedNodeName: bestCandidate.Name()}, framework.NewStatus(framework.Success)
return framework.NewPostFilterResultWithNominatedNode(bestCandidate.Name()), framework.NewStatus(framework.Success)
}

// FindCandidates calculates a slice of preemption candidates.
Expand Down
6 changes: 5 additions & 1 deletion pkg/scheduler/framework/runtime/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,18 +711,22 @@ func (f *frameworkImpl) RunPostFilterPlugins(ctx context.Context, state *framewo
}()

statuses := make(framework.PluginToStatus)
// `result` records the last meaningful(non-noop) PostFilterResult.
var result *framework.PostFilterResult
for _, pl := range f.postFilterPlugins {
r, s := f.runPostFilterPlugin(ctx, pl, state, pod, filteredNodeStatusMap)
if s.IsSuccess() {
return r, s
} else if !s.IsUnschedulable() {
// Any status other than Success or Unschedulable is Error.
return nil, framework.AsStatus(s.AsError())
} else if r != nil && r.Mode() != framework.ModeNoop {
Huang-Wei marked this conversation as resolved.
Show resolved Hide resolved
result = r
}
statuses[pl.Name()] = s
}

return nil, statuses.Merge()
return result, statuses.Merge()
}

func (f *frameworkImpl) runPostFilterPlugin(ctx context.Context, pl framework.PostFilterPlugin, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
Expand Down
6 changes: 4 additions & 2 deletions pkg/scheduler/framework/runtime/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ func TestRunScorePlugins(t *testing.T) {
},
{
name: "single ScoreWithNormalize plugin",
//registry: registry,
// registry: registry,
plugins: buildScoreConfigDefaultWeights(scoreWithNormalizePlugin1),
pluginConfigs: []config.PluginConfig{
{
Expand Down Expand Up @@ -1596,7 +1596,9 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) {

podNominator := internalqueue.NewPodNominator(nil)
if tt.nominatedPod != nil {
podNominator.AddNominatedPod(framework.NewPodInfo(tt.nominatedPod), nodeName)
podNominator.AddNominatedPod(
framework.NewPodInfo(tt.nominatedPod),
&framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: nodeName})
}
profile := config.KubeSchedulerProfile{Plugins: cfgPls}
f, err := newFrameworkWithQueueSortAndBind(registry, profile, WithPodNominator(podNominator))
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/generic_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,8 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
if err := scheduler.cache.UpdateSnapshot(scheduler.nodeInfoSnapshot); err != nil {
t.Fatal(err)
}
fwk.AddNominatedPod(framework.NewPodInfo(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}), "1")
fwk.AddNominatedPod(framework.NewPodInfo(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}),
&framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "1"})

_, _, err = scheduler.findNodesThatFitPod(context.Background(), nil, fwk, framework.NewCycleState(), test.pod)

Expand Down
Loading