From 83edb7a554920f707b31e823d467c3f2c23884a3 Mon Sep 17 00:00:00 2001 From: Thor <1187526662@qq.com> Date: Tue, 28 Jul 2020 11:28:09 +0800 Subject: [PATCH 1/3] fix bug of queue capability lose efficacy Signed-off-by: Thor <1187526662@qq.com> --- pkg/scheduler/actions/enqueue/enqueue.go | 8 +++++++- pkg/scheduler/framework/session.go | 17 ++++++++++------- pkg/scheduler/plugins/proportion/proportion.go | 3 +-- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/pkg/scheduler/actions/enqueue/enqueue.go b/pkg/scheduler/actions/enqueue/enqueue.go index 6e76c95a44..52778f6ee5 100644 --- a/pkg/scheduler/actions/enqueue/enqueue.go +++ b/pkg/scheduler/actions/enqueue/enqueue.go @@ -57,7 +57,6 @@ func (enqueue *enqueueAction) Execute(ssn *framework.Session) { queues := util.NewPriorityQueue(ssn.QueueOrderFn) queueMap := map[api.QueueID]*api.QueueInfo{} - jobsMap := map[api.QueueID]*util.PriorityQueue{} for _, job := range ssn.Jobs { @@ -72,6 +71,7 @@ func (enqueue *enqueueAction) Execute(ssn *framework.Session) { queueMap[queue.UID] = queue queues.Push(queue) + ssn.InqueueJobResource[queue.UID] = api.EmptyResource() } } @@ -82,6 +82,11 @@ func (enqueue *enqueueAction) Execute(ssn *framework.Session) { klog.V(3).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue) jobsMap[job.Queue].Push(job) } + + if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue { + klog.V(3).Infof("Added Job <%s/%s> into InqueueResource", job.Namespace, job.Name, job.Queue) + ssn.InqueueJobResource[job.Queue].Add(api.NewResource(*job.PodGroup.Spec.MinResources)) + } } klog.V(3).Infof("Try to enqueue PodGroup to %d Queues", len(jobsMap)) @@ -128,6 +133,7 @@ func (enqueue *enqueueAction) Execute(ssn *framework.Session) { if inqueue { job.PodGroup.Status.Phase = scheduling.PodGroupInqueue ssn.Jobs[job.UID] = job + ssn.InqueueJobResource[job.Queue].Add(api.NewResource(*job.PodGroup.Spec.MinResources)) } // Added Queue back until no job in Queue. diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index b74c48781a..5648d79193 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -40,10 +40,11 @@ type Session struct { podGroupStatus map[api.JobID]*scheduling.PodGroupStatus - Jobs map[api.JobID]*api.JobInfo - Nodes map[string]*api.NodeInfo - Queues map[api.QueueID]*api.QueueInfo - NamespaceInfo map[api.NamespaceName]*api.NamespaceInfo + Jobs map[api.JobID]*api.JobInfo + Nodes map[string]*api.NodeInfo + Queues map[api.QueueID]*api.QueueInfo + NamespaceInfo map[api.NamespaceName]*api.NamespaceInfo + InqueueJobResource map[api.QueueID]*api.Resource Backlog []*api.JobInfo Tiers []conf.Tier @@ -76,9 +77,10 @@ func openSession(cache cache.Cache) *Session { podGroupStatus: map[api.JobID]*scheduling.PodGroupStatus{}, - Jobs: map[api.JobID]*api.JobInfo{}, - Nodes: map[string]*api.NodeInfo{}, - Queues: map[api.QueueID]*api.QueueInfo{}, + Jobs: map[api.JobID]*api.JobInfo{}, + Nodes: map[string]*api.NodeInfo{}, + Queues: map[api.QueueID]*api.QueueInfo{}, + InqueueJobResource: map[api.QueueID]*api.Resource{}, plugins: map[string]Plugin{}, jobOrderFns: map[string]api.CompareFn{}, @@ -150,6 +152,7 @@ func closeSession(ssn *Session) { ssn.jobOrderFns = nil ssn.namespaceOrderFns = nil ssn.queueOrderFns = nil + ssn.InqueueJobResource = nil klog.V(3).Infof("Close Session %v", ssn.UID) } diff --git a/pkg/scheduler/plugins/proportion/proportion.go b/pkg/scheduler/plugins/proportion/proportion.go index a96881724e..a2db67eaac 100644 --- a/pkg/scheduler/plugins/proportion/proportion.go +++ b/pkg/scheduler/plugins/proportion/proportion.go @@ -18,7 +18,6 @@ package proportion import ( "k8s.io/klog" - "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/api/helpers" "volcano.sh/volcano/pkg/scheduler/framework" @@ -226,7 +225,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { minReq := api.NewResource(*job.PodGroup.Spec.MinResources) // The queue resource quota limit has not reached - return minReq.Add(attr.allocated).LessEqual(api.NewResource(queue.Queue.Spec.Capability)) + return minReq.Add(attr.allocated).Add(ssn.InqueueJobResource[job.Queue]).LessEqual(api.NewResource(queue.Queue.Spec.Capability)) }) // Register event handlers. From 5bd8be35962795431920962c136d33dab057799e Mon Sep 17 00:00:00 2001 From: Thor <1187526662@qq.com> Date: Tue, 28 Jul 2020 18:01:19 +0800 Subject: [PATCH 2/3] remove redudant argument Signed-off-by: Thor <1187526662@qq.com> --- pkg/scheduler/actions/enqueue/enqueue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/scheduler/actions/enqueue/enqueue.go b/pkg/scheduler/actions/enqueue/enqueue.go index 52778f6ee5..a4982d7d06 100644 --- a/pkg/scheduler/actions/enqueue/enqueue.go +++ b/pkg/scheduler/actions/enqueue/enqueue.go @@ -84,7 +84,7 @@ func (enqueue *enqueueAction) Execute(ssn *framework.Session) { } if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue { - klog.V(3).Infof("Added Job <%s/%s> into InqueueResource", job.Namespace, job.Name, job.Queue) + klog.V(3).Infof("Added Job <%s/%s> into InqueueResource", job.Namespace, job.Name) ssn.InqueueJobResource[job.Queue].Add(api.NewResource(*job.PodGroup.Spec.MinResources)) } } From ca2038bd7a805f61a342a9a9e7f7dcb8c903214d Mon Sep 17 00:00:00 2001 From: xuzhonghu Date: Wed, 29 Jul 2020 15:09:23 +0800 Subject: [PATCH 3/3] Record Inqueue job resource request in queueAttr --- pkg/scheduler/actions/enqueue/enqueue.go | 7 ------- pkg/scheduler/framework/session.go | 17 +++++++---------- .../plugins/proportion/proportion.go | 19 +++++++++++++++---- 3 files changed, 22 insertions(+), 21 deletions(-) diff --git a/pkg/scheduler/actions/enqueue/enqueue.go b/pkg/scheduler/actions/enqueue/enqueue.go index a4982d7d06..b230b460e5 100644 --- a/pkg/scheduler/actions/enqueue/enqueue.go +++ b/pkg/scheduler/actions/enqueue/enqueue.go @@ -71,7 +71,6 @@ func (enqueue *enqueueAction) Execute(ssn *framework.Session) { queueMap[queue.UID] = queue queues.Push(queue) - ssn.InqueueJobResource[queue.UID] = api.EmptyResource() } } @@ -82,11 +81,6 @@ func (enqueue *enqueueAction) Execute(ssn *framework.Session) { klog.V(3).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue) jobsMap[job.Queue].Push(job) } - - if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue { - klog.V(3).Infof("Added Job <%s/%s> into InqueueResource", job.Namespace, job.Name) - ssn.InqueueJobResource[job.Queue].Add(api.NewResource(*job.PodGroup.Spec.MinResources)) - } } klog.V(3).Infof("Try to enqueue PodGroup to %d Queues", len(jobsMap)) @@ -133,7 +127,6 @@ func (enqueue *enqueueAction) Execute(ssn *framework.Session) { if inqueue { job.PodGroup.Status.Phase = scheduling.PodGroupInqueue ssn.Jobs[job.UID] = job - ssn.InqueueJobResource[job.Queue].Add(api.NewResource(*job.PodGroup.Spec.MinResources)) } // Added Queue back until no job in Queue. diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index 5648d79193..b74c48781a 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -40,11 +40,10 @@ type Session struct { podGroupStatus map[api.JobID]*scheduling.PodGroupStatus - Jobs map[api.JobID]*api.JobInfo - Nodes map[string]*api.NodeInfo - Queues map[api.QueueID]*api.QueueInfo - NamespaceInfo map[api.NamespaceName]*api.NamespaceInfo - InqueueJobResource map[api.QueueID]*api.Resource + Jobs map[api.JobID]*api.JobInfo + Nodes map[string]*api.NodeInfo + Queues map[api.QueueID]*api.QueueInfo + NamespaceInfo map[api.NamespaceName]*api.NamespaceInfo Backlog []*api.JobInfo Tiers []conf.Tier @@ -77,10 +76,9 @@ func openSession(cache cache.Cache) *Session { podGroupStatus: map[api.JobID]*scheduling.PodGroupStatus{}, - Jobs: map[api.JobID]*api.JobInfo{}, - Nodes: map[string]*api.NodeInfo{}, - Queues: map[api.QueueID]*api.QueueInfo{}, - InqueueJobResource: map[api.QueueID]*api.Resource{}, + Jobs: map[api.JobID]*api.JobInfo{}, + Nodes: map[string]*api.NodeInfo{}, + Queues: map[api.QueueID]*api.QueueInfo{}, plugins: map[string]Plugin{}, jobOrderFns: map[string]api.CompareFn{}, @@ -152,7 +150,6 @@ func closeSession(ssn *Session) { ssn.jobOrderFns = nil ssn.namespaceOrderFns = nil ssn.queueOrderFns = nil - ssn.InqueueJobResource = nil klog.V(3).Infof("Close Session %v", ssn.UID) } diff --git a/pkg/scheduler/plugins/proportion/proportion.go b/pkg/scheduler/plugins/proportion/proportion.go index a2db67eaac..3d7c0f61ff 100644 --- a/pkg/scheduler/plugins/proportion/proportion.go +++ b/pkg/scheduler/plugins/proportion/proportion.go @@ -18,6 +18,8 @@ package proportion import ( "k8s.io/klog" + + "volcano.sh/volcano/pkg/apis/scheduling" "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/api/helpers" "volcano.sh/volcano/pkg/scheduler/framework" @@ -42,6 +44,8 @@ type queueAttr struct { deserved *api.Resource allocated *api.Resource request *api.Resource + // inqueue represents the resource request of the inqueue job + inqueue *api.Resource } // New return proportion action @@ -68,7 +72,6 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { // Build attributes for Queues. for _, job := range ssn.Jobs { klog.V(4).Infof("Considering Job <%s/%s>.", job.Namespace, job.Name) - if _, found := pp.queueOpts[job.Queue]; !found { queue := ssn.Queues[job.Queue] attr := &queueAttr{ @@ -79,25 +82,29 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { deserved: api.EmptyResource(), allocated: api.EmptyResource(), request: api.EmptyResource(), + inqueue: api.EmptyResource(), } pp.queueOpts[job.Queue] = attr klog.V(4).Infof("Added Queue <%s> attributes.", job.Queue) } + attr := pp.queueOpts[job.Queue] for status, tasks := range job.TaskStatusIndex { if api.AllocatedStatus(status) { for _, t := range tasks { - attr := pp.queueOpts[job.Queue] attr.allocated.Add(t.Resreq) attr.request.Add(t.Resreq) } } else if status == api.Pending { for _, t := range tasks { - attr := pp.queueOpts[job.Queue] attr.request.Add(t.Resreq) } } } + + if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue { + attr.inqueue.Add(api.NewResource(*job.PodGroup.Spec.MinResources)) + } } remaining := pp.totalResource.Clone() @@ -225,7 +232,11 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { minReq := api.NewResource(*job.PodGroup.Spec.MinResources) // The queue resource quota limit has not reached - return minReq.Add(attr.allocated).Add(ssn.InqueueJobResource[job.Queue]).LessEqual(api.NewResource(queue.Queue.Spec.Capability)) + inqueue := minReq.Add(attr.allocated).Add(attr.inqueue).LessEqual(api.NewResource(queue.Queue.Spec.Capability)) + if inqueue { + attr.inqueue.Add(api.NewResource(*job.PodGroup.Spec.MinResources)) + } + return inqueue }) // Register event handlers.