Skip to content

Commit

Permalink
scheduler: add quota inplace
Browse files Browse the repository at this point in the history
Signed-off-by: chuanyun.lcy <chuanyun.lcy@alibaba-inc.com>
  • Loading branch information
chuanyun.lcy committed Jan 7, 2025
1 parent b3ea5b6 commit 6dd540e
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 8 deletions.
32 changes: 32 additions & 0 deletions pkg/scheduler/frameworkext/helper/forcesync_eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,35 @@ func ForceSyncFromInformer(stopCh <-chan struct{}, cacheSyncer CacheSyncer, info
syncEventHandler.syncDone()
return registration, err
}

func ForceSyncFromInformerWithReplace(stopCh <-chan struct{}, cacheSyncer CacheSyncer, informer cache.SharedInformer, handler cache.ResourceEventHandler, replaceHandler func([]interface{}) error, options ...Option) (cache.ResourceEventHandlerRegistration, error) {
syncEventHandler := newForceSyncEventHandler(handler, options...)
registration, err := informer.AddEventHandlerWithResyncPeriod(syncEventHandler, syncEventHandler.resyncPeriod)
if err != nil {
return nil, err
}

if cacheSyncer != nil {
cacheSyncer.Start(stopCh)
cacheSyncer.WaitForCacheSync(stopCh)
}

allObjects := informer.GetStore().List()
// record object uid.
for _, obj := range allObjects {
if metaAccessor, ok := obj.(metav1.ObjectMetaAccessor); ok {
objectMeta := metaAccessor.GetObjectMeta()
resourceVersion, err := strconv.ParseInt(objectMeta.GetResourceVersion(), 10, 64)
if err == nil {
syncEventHandler.objects[objectMeta.GetUID()] = resourceVersion
}
}
}
// replace objects
err = replaceHandler(allObjects)
if err != nil {
return nil, err
}
syncEventHandler.syncDone()
return registration, err
}
49 changes: 49 additions & 0 deletions pkg/scheduler/frameworkext/helper/forcesync_eventhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,52 @@ func TestSyncedEventHandler(t *testing.T) {
}, wait.NeverStop)
assert.NoError(t, err)
}

func TestSyncedEventHandlerWithReplace(t *testing.T) {
var objects []runtime.Object
for i := 0; i < 10; i++ {
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
UID: uuid.NewUUID(),
Name: fmt.Sprintf("node-%d", i),
ResourceVersion: fmt.Sprintf("%d", i+1),
},
}
objects = append(objects, node)
}
fakeClientSet := kubefake.NewSimpleClientset(objects...)
sharedInformerFactory := informers.NewSharedInformerFactory(fakeClientSet, 0)
nodeInformer := sharedInformerFactory.Core().V1().Nodes()
addTimes := map[string]int{}
var wg sync.WaitGroup
wg.Add(10)
ForceSyncFromInformerWithReplace(context.TODO().Done(), sharedInformerFactory, nodeInformer.Informer(), cache.ResourceEventHandlerFuncs{}, func(objs []interface{}) error {
for _, obj := range objs {
node := obj.(*corev1.Node)
addTimes[node.Name]++
wg.Done()
}
return nil
})
wg.Wait()
for _, v := range addTimes {
if v > 1 {
t.Errorf("unexpected add times, want 1 but got %d", v)
break
}
}
node, err := nodeInformer.Lister().Get("node-0")
assert.NoError(t, err)
assert.NotNil(t, node)
node = node.DeepCopy()
node.ResourceVersion = "100"
_, err = fakeClientSet.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{})
assert.NoError(t, err)
err = wait.PollUntil(1*time.Second, func() (done bool, err error) {
node, err := nodeInformer.Lister().Get("node-0")
assert.NoError(t, err)
assert.NotNil(t, node)
return node.ResourceVersion == "100", nil
}, wait.NeverStop)
assert.NoError(t, err)
}
84 changes: 78 additions & 6 deletions pkg/scheduler/plugins/elasticquota/core/group_quota_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,8 +406,8 @@ func (gqm *GroupQuotaManager) UpdateQuota(quota *v1alpha1.ElasticQuota, isDelete
}
localQuotaInfo.updateQuotaInfoFromRemote(newQuotaInfo)
} else {
// TODO: inplace add
gqm.quotaInfoMap[quotaName] = newQuotaInfo
gqm.updateQuotaInternalNoLock(newQuotaInfo, nil)
return nil
}
}

Expand All @@ -417,10 +417,25 @@ func (gqm *GroupQuotaManager) UpdateQuota(quota *v1alpha1.ElasticQuota, isDelete
return nil
}

func (gqm *GroupQuotaManager) UpdateQuotaInfo(quota *v1alpha1.ElasticQuota) {
gqm.hierarchyUpdateLock.Lock()
defer gqm.hierarchyUpdateLock.Unlock()

newQuotaInfo := NewQuotaInfoFromQuota(quota)
gqm.quotaInfoMap[quota.Name] = newQuotaInfo
}

func (gqm *GroupQuotaManager) ResetQuota() {
gqm.hierarchyUpdateLock.Lock()
defer gqm.hierarchyUpdateLock.Unlock()

gqm.resetQuotaNoLock()
}

func (gqm *GroupQuotaManager) resetQuotaNoLock() {
start := time.Now()
defer func() {
klog.Infof("reset quota tree %v take %v", time.Since(start))
klog.Infof("reset quota tree %v take %v", gqm.treeID, time.Since(start))
}()
// rebuild gqm.quotaTopoNodeMap
gqm.rebuildQuotaTopoNodeMapNoLock()
Expand Down Expand Up @@ -981,23 +996,80 @@ func (gqm *GroupQuotaManager) recursiveUpdateGroupTreeWithDeltaAllocated(deltaAl
}

func (gqm *GroupQuotaManager) updateQuotaInternalNoLock(newQuotaInfo, oldQuotaInfo *QuotaInfo) {
// update topogy node map
gqm.updateQuotaTopoNodeNoLock(newQuotaInfo, oldQuotaInfo)

// update quota info map
if oldQuotaInfo == nil {
gqm.runtimeQuotaCalculatorMap[newQuotaInfo.Name] = NewRuntimeQuotaCalculator(newQuotaInfo.Name)
if gqm.runtimeQuotaCalculatorMap[newQuotaInfo.ParentName] == nil {
gqm.runtimeQuotaCalculatorMap[newQuotaInfo.ParentName] = NewRuntimeQuotaCalculator(newQuotaInfo.ParentName)
}
gqm.quotaInfoMap[newQuotaInfo.Name] = newQuotaInfo
}

// update resource keys
gqm.updateResourceKeyNoLock()

oldMax := v1.ResourceList{}
if oldQuotaInfo != nil {
oldMax = oldQuotaInfo.CalculateInfo.Max
}
// max changed
if !quotav1.Equals(newQuotaInfo.CalculateInfo.Max, oldQuotaInfo.CalculateInfo.Max) {
if !quotav1.Equals(newQuotaInfo.CalculateInfo.Max, oldMax) {
klog.V(4).Infof("[updateQuotaInternalNoLock] quota %v max change, oldMax: %v, newMax: %v",
newQuotaInfo.Name, util.DumpJSON(oldMax), util.DumpJSON(newQuotaInfo.CalculateInfo.Max))
gqm.doUpdateOneGroupMaxQuotaNoLock(newQuotaInfo.Name, newQuotaInfo.CalculateInfo.Max)
}

oldMin := v1.ResourceList{}
if oldQuotaInfo != nil {
oldMin = oldQuotaInfo.CalculateInfo.Min
}
// min changed
if !quotav1.Equals(newQuotaInfo.CalculateInfo.Min, oldQuotaInfo.CalculateInfo.Min) {
if !quotav1.Equals(newQuotaInfo.CalculateInfo.Min, oldMin) {
klog.V(4).Infof("[updateQuotaInternalNoLock] quota %v min change, oldMin: %v, newMin: %v",
newQuotaInfo.Name, util.DumpJSON(oldMin), util.DumpJSON(newQuotaInfo.CalculateInfo.Min))
gqm.doUpdateOneGroupMinQuotaNoLock(newQuotaInfo.Name, newQuotaInfo.CalculateInfo.Min)
}

oldSharedWeight := v1.ResourceList{}
if oldQuotaInfo != nil {
oldSharedWeight = oldQuotaInfo.CalculateInfo.SharedWeight
}
// sharedweight changed
if !quotav1.Equals(newQuotaInfo.CalculateInfo.SharedWeight, oldQuotaInfo.CalculateInfo.SharedWeight) {
if !quotav1.Equals(newQuotaInfo.CalculateInfo.SharedWeight, oldSharedWeight) {
klog.V(4).Infof("[updateQuotaInternalNoLock] quota %v sharedWeight change, oldMin: %v, newMin: %v",
newQuotaInfo.Name, util.DumpJSON(oldSharedWeight), util.DumpJSON(newQuotaInfo.CalculateInfo.SharedWeight))
gqm.doUpdateOneGroupSharedWeightNoLock(newQuotaInfo.Name, newQuotaInfo.CalculateInfo.SharedWeight)
}

}

func (gqm *GroupQuotaManager) updateQuotaTopoNodeNoLock(newQuotaInfo, oldQuotaInfo *QuotaInfo) {
if oldQuotaInfo != nil {
parentNode, ok := gqm.quotaTopoNodeMap[oldQuotaInfo.ParentName]
if ok {
delete(parentNode.childGroupQuotaInfos, oldQuotaInfo.Name)
}
}

node, ok := gqm.quotaTopoNodeMap[newQuotaInfo.Name]
if !ok {
node = NewQuotaTopoNode(newQuotaInfo.Name, newQuotaInfo)
} else {
node.quotaInfo = newQuotaInfo
}

parentNode, ok := gqm.quotaTopoNodeMap[newQuotaInfo.ParentName]
if !ok {
parentNode = NewQuotaTopoNode(newQuotaInfo.ParentName, &QuotaInfo{
Name: newQuotaInfo.ParentName,
})
}
parentNode.childGroupQuotaInfos[newQuotaInfo.Name] = node
}

func (gqm *GroupQuotaManager) doUpdateOneGroupMaxQuotaNoLock(quotaName string, newMax v1.ResourceList) {
curToAllParInfos := gqm.getCurToAllParentGroupQuotaInfoNoLock(quotaName)
quotaInfoLen := len(curToAllParInfos)
Expand Down
7 changes: 5 additions & 2 deletions pkg/scheduler/plugins/elasticquota/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,14 @@ func New(args runtime.Object, handle framework.Handle) (framework.Plugin, error)
elasticQuota.createRootQuotaIfNotPresent()
elasticQuota.createSystemQuotaIfNotPresent()
elasticQuota.createDefaultQuotaIfNotPresent()
frameworkexthelper.ForceSyncFromInformer(ctx.Done(), scheSharedInformerFactory, informer, cache.ResourceEventHandlerFuncs{
_, err := frameworkexthelper.ForceSyncFromInformerWithReplace(ctx.Done(), scheSharedInformerFactory, informer, cache.ResourceEventHandlerFuncs{
AddFunc: elasticQuota.OnQuotaAdd,
UpdateFunc: elasticQuota.OnQuotaUpdate,
DeleteFunc: elasticQuota.OnQuotaDelete,
})
}, elasticQuota.ReplaceQuotas)
if err != nil {
return nil, err
}

nodeInformer := handle.SharedInformerFactory().Core().V1().Nodes().Informer()
frameworkexthelper.ForceSyncFromInformer(ctx.Done(), handle.SharedInformerFactory(), nodeInformer, cache.ResourceEventHandlerFuncs{
Expand Down
38 changes: 38 additions & 0 deletions pkg/scheduler/plugins/elasticquota/quota_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package elasticquota

import (
"encoding/json"
"time"

corev1 "k8s.io/api/core/v1"
quotav1 "k8s.io/apiserver/pkg/quota/v1"
Expand Down Expand Up @@ -113,6 +114,43 @@ func (g *Plugin) OnQuotaDelete(obj interface{}) {

}

func (g *Plugin) ReplaceQuotas(objs []interface{}) error {
quotas := make([]*schedulerv1alpha1.ElasticQuota, 0, len(objs))
for _, obj := range objs {
quota := obj.(*schedulerv1alpha1.ElasticQuota)
quotas = append(quotas, quota)
}

start := time.Now()
defer func() {
klog.Infof("ReplaceQuotas replace %v quotas take %v", len(quotas), time.Since(start))
}()

g.groupQuotaManagersForQuotaTree = make(map[string]*core.GroupQuotaManager)
g.groupQuotaManager = core.NewGroupQuotaManager("", g.pluginArgs.SystemQuotaGroupMax, g.pluginArgs.DefaultQuotaGroupMax)
g.quotaToTreeMap = make(map[string]string)
g.quotaToTreeMap[extension.DefaultQuotaName] = ""
g.quotaToTreeMap[extension.SystemQuotaName] = ""

for _, quota := range quotas {
if quota.DeletionTimestamp != nil {
continue
}
mgr := g.GetOrCreateGroupQuotaManagerForTree(quota.Labels[extension.LabelQuotaTreeID])
treeID := mgr.GetTreeID()
g.updateQuotaToTreeMap(quota.Name, treeID)
g.handlerQuotaWhenRoot(quota, mgr, false)
mgr.UpdateQuotaInfo(quota)
}

g.groupQuotaManager.ResetQuota()
for _, mgr := range g.groupQuotaManagersForQuotaTree {
mgr.ResetQuota()
}

return nil
}

func (g *Plugin) GetQuotaSummary(quotaName string, includePods bool) (*core.QuotaInfoSummary, bool) {
mgr := g.GetGroupQuotaManagerForQuota(quotaName)
return mgr.GetQuotaSummary(quotaName, includePods)
Expand Down

0 comments on commit 6dd540e

Please sign in to comment.