Skip to content

Commit

Permalink
support fair share
Browse files Browse the repository at this point in the history
  • Loading branch information
lminzhw committed Jul 19, 2019
1 parent 4492e66 commit b754fd2
Show file tree
Hide file tree
Showing 12 changed files with 652 additions and 93 deletions.
82 changes: 73 additions & 9 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,14 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
defer glog.V(3).Infof("Leaving Allocate ...")

queues := util.NewPriorityQueue(ssn.QueueOrderFn)
// map[queueId]PriorityQueue(*api.JobInfo)
jobsMap := map[api.QueueID]*util.PriorityQueue{}
// map[queueId]PriorityQueue(namespaceName)
namespaceMap := map[api.QueueID]*util.PriorityQueue{}
// map[queueId]map[namespaceName]PriorityQueue(*api.JobInfo)
jobInNamespaceMap := map[api.QueueID]map[string]*util.PriorityQueue{}

namespaceOrderEnabled := ssn.NamespaceOrderEnabled()

for _, job := range ssn.Jobs {
if job.PodGroup.Status.Phase == api.PodGroupPending {
Expand All @@ -62,12 +69,37 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
continue
}

// ignore namespace order enabled or not, just add key in jobsMap
if _, found := jobsMap[job.Queue]; !found {
jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
}

glog.V(4).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
jobsMap[job.Queue].Push(job)
if !namespaceOrderEnabled {
glog.V(4).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
jobsMap[job.Queue].Push(job)
continue
}

if _, found := namespaceMap[job.Queue]; !found {
namespaceMap[job.Queue] = util.NewPriorityQueue(ssn.NamespaceOrderFn)
}

namespaceJob, found := jobInNamespaceMap[job.Queue]
if !found {
namespaceJob = make(map[string]*util.PriorityQueue)
jobInNamespaceMap[job.Queue] = namespaceJob
}

jobs, found := namespaceJob[job.Namespace]
if !found {
jobs = util.NewPriorityQueue(ssn.JobOrderFn)
namespaceJob[job.Namespace] = jobs

namespaceMap[job.Queue].Push(job.Namespace)
}

glog.V(4).Infof("Added Job <%s/%s> into Queue <%s> Namespace <%s>", job.Namespace, job.Name, job.Queue, job.Namespace)
jobs.Push(job)
}

glog.V(3).Infof("Try to allocate resource to %d Queues", len(jobsMap))
Expand Down Expand Up @@ -102,16 +134,44 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
continue
}

jobs, found := jobsMap[queue.UID]

glog.V(3).Infof("Try to allocate resource to Jobs in Queue <%v>", queue.Name)

if !found || jobs.Empty() {
glog.V(4).Infof("Can not find jobs for queue %s.", queue.Name)
continue
var jobQueue *util.PriorityQueue
var namespace string
var namespaceQueue *util.PriorityQueue

if !namespaceOrderEnabled {
jobs, found := jobsMap[queue.UID]
if !found || jobs.Empty() {
glog.V(4).Infof("Can not find jobs for queue %s.", queue.Name)
continue
}
jobQueue = jobs
} else {
namespaces, found := namespaceMap[queue.UID]
if !found || namespaces.Empty() {
glog.V(4).Infof("Can not find namespace for queue %s.", queue.Name)
continue
}
namespaceQueue = namespaces
namespace = namespaces.Pop().(string)

glog.V(3).Infof("Try to allocate resource to Jobs in Queue <%v>, namespace <%v>", queue.Name, namespace)

namespacesInQueue, found := jobInNamespaceMap[queue.UID]
if !found {
glog.V(4).Infof("Can not find namespace %s for queue %s", namespace, queue.Name)
continue
}
namespaceJob, found := namespacesInQueue[namespace]
if !found || namespaceJob.Empty() {
glog.V(4).Infof("Can not find job for queue %s, namespace %s.", queue.Name, namespace)
continue
}
jobQueue = namespaceJob
}

job := jobs.Pop().(*api.JobInfo)
job := jobQueue.Pop().(*api.JobInfo)
if _, found := pendingTasks[job.UID]; !found {
tasks := util.NewPriorityQueue(ssn.TaskOrderFn)
for _, task := range job.TaskStatusIndex[api.Pending] {
Expand Down Expand Up @@ -183,7 +243,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
}

if ssn.JobReady(job) {
jobs.Push(job)
jobQueue.Push(job)
break
}
}
Expand All @@ -193,6 +253,10 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
} else {
stmt.Discard()
}

if namespaceOrderEnabled {
namespaceQueue.Push(namespace)
}
// Added Queue back until no job in Queue.
queues.Push(queue)
}
Expand Down
15 changes: 12 additions & 3 deletions pkg/scheduler/api/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import "fmt"

// ClusterInfo is a snapshot of cluster by cache.
type ClusterInfo struct {
Jobs map[JobID]*JobInfo
Nodes map[string]*NodeInfo
Queues map[QueueID]*QueueInfo
Jobs map[JobID]*JobInfo
Nodes map[string]*NodeInfo
Queues map[QueueID]*QueueInfo
NamespaceInfo map[string]*NamespaceInfo
}

func (ci ClusterInfo) String() string {
Expand Down Expand Up @@ -57,5 +58,13 @@ func (ci ClusterInfo) String() string {
}
}

if len(ci.NamespaceInfo) != 0 {
str = str + "Namespaces:\n"
for _, ns := range ci.NamespaceInfo {
str = str + fmt.Sprintf("\t Namespace(%s) Weight(%v)\n",
ns.Name, ns.Weight)
}
}

return str
}
153 changes: 153 additions & 0 deletions pkg/scheduler/api/namespace_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package api

import (
"fmt"
"sync"

"github.com/golang/glog"

v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)

const (
// QuotaKey is the key in ResourceQuota.spec.hard indicating the weight of thi namespace
QuotaKey = "volcano.sh/namespace.weight"
// DefaultWeight is the default weight of namespace
DefaultWeight = 1
)

// NamespaceInfo records information of namespace
type NamespaceInfo struct {
Name string
// Weight is the highest weight among many ResourceQuota.
Weight int64
}

// GetWeight returns weight of a namespace, any invalid case would get default value
func (n *NamespaceInfo) GetWeight() int64 {
if n == nil {
return DefaultWeight
}
if n.Weight == 0 {
return DefaultWeight
}
return n.Weight
}

type quotaItem struct {
name string
weight int64
}

func quotaItemKeyFunc(obj interface{}) (string, error) {
item, ok := obj.(*quotaItem)
if !ok {
return "", fmt.Errorf("obj with type %T could not parse", obj)
}
return item.name, nil
}

// for big root heap
func quotaItemLessFunc(a interface{}, b interface{}) bool {
A := a.(*quotaItem)
B := b.(*quotaItem)
return A.weight > B.weight
}

// NamespaceCollection will record all details about namespace
type NamespaceCollection struct {
Name string

weightMu sync.Mutex
quotaWeight *cache.Heap
}

// NewNamespaceCollection creates new NamespaceCollection object to record all information about a namespace
func NewNamespaceCollection(name string) *NamespaceCollection {
n := &NamespaceCollection{
Name: name,
quotaWeight: cache.NewHeap(quotaItemKeyFunc, quotaItemLessFunc),
}
// add at least one item into quotaWeight.
// Because cache.Heap.Pop would be blocked until queue is not empty
n.updateWeight(&quotaItem{
name: QuotaKey,
weight: DefaultWeight,
})
return n
}

func (n *NamespaceCollection) deleteWeight(q *quotaItem) {
n.weightMu.Lock()
n.quotaWeight.Delete(q)
n.weightMu.Unlock()
}

func (n *NamespaceCollection) updateWeight(q *quotaItem) {
n.weightMu.Lock()
n.quotaWeight.Update(q)
n.weightMu.Unlock()
}

func itemFromQuota(quota *v1.ResourceQuota) *quotaItem {
var weight int64 = DefaultWeight

quotaWeight, ok := quota.Spec.Hard[QuotaKey]
if ok {
weight = quotaWeight.Value()
}

item := &quotaItem{
name: quota.Name,
weight: weight,
}
return item
}

// Update modify the registered information according quota object
func (n *NamespaceCollection) Update(quota *v1.ResourceQuota) {
n.updateWeight(itemFromQuota(quota))
}

// Delete remove the registered information according quota object
func (n *NamespaceCollection) Delete(quota *v1.ResourceQuota) {
n.deleteWeight(itemFromQuota(quota))
}

// Snapshot will clone a NamespaceInfo without Heap according NamespaceCollection
func (n *NamespaceCollection) Snapshot() *NamespaceInfo {
var weight int64 = DefaultWeight

n.weightMu.Lock()
obj, err := n.quotaWeight.Pop()
if err != nil {
glog.Warningf("namespace %s, quota weight meets error %v when pop", n.Name, err)
} else {
item := obj.(*quotaItem)
weight = item.weight
n.quotaWeight.Add(item)
}
n.weightMu.Unlock()

return &NamespaceInfo{
Name: n.Name,
Weight: weight,
}
}
57 changes: 57 additions & 0 deletions pkg/scheduler/api/namespace_info_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package api

import (
"testing"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/api/resource"
)

func newQuota(name string, weight int) *v1.ResourceQuota {
q := &v1.ResourceQuota{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: v1.ResourceQuotaSpec{
Hard: make(v1.ResourceList),
},
}

if weight >= 0 {
q.Spec.Hard[v1.ResourceName(QuotaKey)] = *resource.NewQuantity(int64(weight), resource.DecimalSI)
}

return q
}

func TestNamespaceCollection(t *testing.T) {
c := NewNamespaceCollection("testCollection")
c.Update(newQuota("abc", 123))
c.Update(newQuota("abc", 456))
c.Update(newQuota("def", -1))
c.Update(newQuota("def", 16))
c.Update(newQuota("ghi", 0))

info := c.Snapshot()
if info.Weight != 456 {
t.Errorf("weight of namespace should be %d, but not %d", 456, info.Weight)
}

c.Delete(newQuota("abc", 0))

info = c.Snapshot()
if info.Weight != 16 {
t.Errorf("weight of namespace should be %d, but not %d", 16, info.Weight)
}

c.Delete(newQuota("abc", 0))
c.Delete(newQuota("def", 15))
c.Delete(newQuota("ghi", -1))

info = c.Snapshot()
if info.Weight != 1 {
t.Errorf("weight of namespace should be default weight %d, but not %d", DefaultWeight, info.Weight)
}
}
Loading

0 comments on commit b754fd2

Please sign in to comment.