Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize creating cluster snapshot #7630

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ type AutoscalingOptions struct {
ForceDeleteLongUnregisteredNodes bool
// DynamicResourceAllocationEnabled configures whether logic for handling DRA objects is enabled.
DynamicResourceAllocationEnabled bool
// ClusterSnapshotParallelism is the maximum parallelism of cluster snapshot creation.
ClusterSnapshotParallelism int
}

// KubeClientOptions specify options for kube client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func BenchmarkFilterOutSchedulable(b *testing.B) {
return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewBasicSnapshotStore())
},
"delta": func() clustersnapshot.ClusterSnapshot {
return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewDeltaSnapshotStore())
return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewDeltaSnapshotStore(16))
},
}
for snapshotName, snapshotFactory := range snapshots {
Expand Down
4 changes: 3 additions & 1 deletion cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ var (
checkCapacityProvisioningRequestBatchTimebox = flag.Duration("check-capacity-provisioning-request-batch-timebox", 10*time.Second, "Maximum time to process a batch of provisioning requests.")
forceDeleteLongUnregisteredNodes = flag.Bool("force-delete-unregistered-nodes", false, "Whether to enable force deletion of long unregistered nodes, regardless of the min size of the node group the belong to.")
enableDynamicResourceAllocation = flag.Bool("enable-dynamic-resource-allocation", false, "Whether logic for handling DRA (Dynamic Resource Allocation) objects is enabled.")
clusterSnapshotParallelism = flag.Int("cluster-snapshot-parallelism", 16, "Maximum parallelism of cluster snapshot creation.")
)

func isFlagPassed(name string) bool {
Expand Down Expand Up @@ -463,6 +464,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
CheckCapacityProvisioningRequestBatchTimebox: *checkCapacityProvisioningRequestBatchTimebox,
ForceDeleteLongUnregisteredNodes: *forceDeleteLongUnregisteredNodes,
DynamicResourceAllocationEnabled: *enableDynamicResourceAllocation,
ClusterSnapshotParallelism: *clusterSnapshotParallelism,
}
}

Expand Down Expand Up @@ -505,7 +507,7 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot
deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions)
drainabilityRules := rules.Default(deleteOptions)

var snapshotStore clustersnapshot.ClusterSnapshotStore = store.NewDeltaSnapshotStore()
var snapshotStore clustersnapshot.ClusterSnapshotStore = store.NewDeltaSnapshotStore(autoscalingOptions.ClusterSnapshotParallelism)
if autoscalingOptions.DynamicResourceAllocationEnabled {
// TODO(DRA): Remove this once DeltaSnapshotStore is integrated with DRA.
klog.Warningf("Using BasicSnapshotStore instead of DeltaSnapshotStore because DRA is enabled. Autoscaling performance/scalability might be decreased.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
p := NewPodInjectionPodListProcessor(podinjectionbackoff.NewFakePodControllerRegistry())
clusterSnapshot := testsnapshot.NewCustomTestSnapshotOrDie(t, store.NewDeltaSnapshotStore())
clusterSnapshot := testsnapshot.NewCustomTestSnapshotOrDie(t, store.NewDeltaSnapshotStore(16))
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.scheduledPods...))
assert.NoError(t, err)
ctx := context.AutoscalingContext{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var snapshots = map[string]func() (clustersnapshot.ClusterSnapshot, error){
if err != nil {
return nil, err
}
return NewPredicateSnapshot(store.NewDeltaSnapshotStore(), fwHandle, true), nil
return NewPredicateSnapshot(store.NewDeltaSnapshotStore(16), fwHandle, true), nil
},
}

Expand Down
104 changes: 89 additions & 15 deletions cluster-autoscaler/simulator/clustersnapshot/store/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ limitations under the License.
package store

import (
"context"
"fmt"

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)
Expand All @@ -44,7 +46,8 @@ import (
// pod affinity - causes scheduler framework to list pods with non-empty selector,
// so basic caching doesn't help.
type DeltaSnapshotStore struct {
data *internalDeltaSnapshotData
data *internalDeltaSnapshotData
parallelism int
}

type deltaSnapshotStoreNodeLister DeltaSnapshotStore
Expand Down Expand Up @@ -137,10 +140,14 @@ func (data *internalDeltaSnapshotData) buildNodeInfoList() []*schedulerframework
return nodeInfoList
}

func (data *internalDeltaSnapshotData) addNode(node *apiv1.Node) error {
func (data *internalDeltaSnapshotData) addNode(node *apiv1.Node) (*schedulerframework.NodeInfo, error) {
nodeInfo := schedulerframework.NewNodeInfo()
nodeInfo.SetNode(node)
return data.addNodeInfo(nodeInfo)
err := data.addNodeInfo(nodeInfo)
if err != nil {
return nil, err
}
return nodeInfo, nil
}

func (data *internalDeltaSnapshotData) addNodeInfo(nodeInfo *schedulerframework.NodeInfo) error {
Expand Down Expand Up @@ -241,6 +248,24 @@ func (data *internalDeltaSnapshotData) addPod(pod *apiv1.Pod, nodeName string) e
return nil
}

func (data *internalDeltaSnapshotData) addPodToNode(pod *apiv1.Pod, ni *schedulerframework.NodeInfo) error {
ni.AddPod(pod)

// Maybe consider deleting from the list in the future. Maybe not.
data.clearCaches()
return nil
}

func (data *internalDeltaSnapshotData) addPodsToNode(pods []*apiv1.Pod, ni *schedulerframework.NodeInfo) error {
for _, pod := range pods {
ni.AddPod(pod)
}

// Maybe consider deleting from the list in the future. Maybe not.
data.clearCaches()
return nil
}

func (data *internalDeltaSnapshotData) removePod(namespace, name, nodeName string) error {
// This always clones node info, even if the pod is actually missing.
// Not sure if we mind, since removing non-existent pod
Expand Down Expand Up @@ -403,8 +428,10 @@ func (snapshot *DeltaSnapshotStore) DeviceClasses() schedulerframework.DeviceCla
}

// NewDeltaSnapshotStore creates instances of DeltaSnapshotStore.
func NewDeltaSnapshotStore() *DeltaSnapshotStore {
snapshot := &DeltaSnapshotStore{}
func NewDeltaSnapshotStore(parallelism int) *DeltaSnapshotStore {
snapshot := &DeltaSnapshotStore{
parallelism: parallelism,
}
snapshot.clear()
return snapshot
}
Expand All @@ -417,7 +444,7 @@ func (snapshot *DeltaSnapshotStore) DraSnapshot() drasnapshot.Snapshot {

// AddSchedulerNodeInfo adds a NodeInfo.
func (snapshot *DeltaSnapshotStore) AddSchedulerNodeInfo(nodeInfo *schedulerframework.NodeInfo) error {
if err := snapshot.data.addNode(nodeInfo.Node()); err != nil {
if _, err := snapshot.data.addNode(nodeInfo.Node()); err != nil {
return err
}
for _, podInfo := range nodeInfo.Pods {
Expand All @@ -428,24 +455,71 @@ func (snapshot *DeltaSnapshotStore) AddSchedulerNodeInfo(nodeInfo *schedulerfram
return nil
}

// setClusterStatePodsSequential sets the pods in cluster state in a sequential way.
func (snapshot *DeltaSnapshotStore) setClusterStatePodsSequential(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need a separate sequential implementation? It behaves the same as ...Parallelized one with parallelism == 1, right?

Copy link
Member Author

@macsko macsko Dec 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to leave parallelism == 1 with previous implementation to make the change potentially revertible (see the discussion above). Sequential implementation could be eventually removed and use workqueue.ParallelizedUntil always, when we are sure the behavior is correct.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, it is just that code cleanups are easy to forget. Can you at least leave a TODO here with your github handle to follow up on this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

for _, pod := range scheduledPods {
if nodeIdx, ok := nodeNameToIdx[pod.Spec.NodeName]; ok {
if err := snapshot.data.addPodToNode(pod, nodeInfos[nodeIdx]); err != nil {
return err
}
}
}
return nil
}

// setClusterStatePodsParallelized sets the pods in cluster state in parallel based on snapshot.parallelism value.
func (snapshot *DeltaSnapshotStore) setClusterStatePodsParallelized(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) error {
podsForNode := make([][]*apiv1.Pod, len(nodeInfos))
for _, pod := range scheduledPods {
nodeIdx, ok := nodeNameToIdx[pod.Spec.NodeName]
if !ok {
continue
}
podsForNode[nodeIdx] = append(podsForNode[nodeIdx], pod)
}

ctx := context.Background()
ctx, cancel := context.WithCancelCause(ctx)

workqueue.ParallelizeUntil(ctx, snapshot.parallelism, len(nodeInfos), func(nodeIdx int) {
err := snapshot.data.addPodsToNode(podsForNode[nodeIdx], nodeInfos[nodeIdx])
if err != nil {
cancel(err)
}
})

return context.Cause(ctx)
}

// SetClusterState sets the cluster state.
func (snapshot *DeltaSnapshotStore) SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod, draSnapshot drasnapshot.Snapshot) error {
snapshot.clear()

knownNodes := make(map[string]bool)
for _, node := range nodes {
if err := snapshot.data.addNode(node); err != nil {
nodeNameToIdx := make(map[string]int, len(nodes))
macsko marked this conversation as resolved.
Show resolved Hide resolved
nodeInfos := make([]*schedulerframework.NodeInfo, len(nodes))
for i, node := range nodes {
nodeInfo, err := snapshot.data.addNode(node)
if err != nil {
return err
}
knownNodes[node.Name] = true
nodeNameToIdx[node.Name] = i
nodeInfos[i] = nodeInfo
}
for _, pod := range scheduledPods {
if knownNodes[pod.Spec.NodeName] {
if err := snapshot.data.addPod(pod, pod.Spec.NodeName); err != nil {
return err
}

if snapshot.parallelism > 1 {
err := snapshot.setClusterStatePodsParallelized(nodeInfos, nodeNameToIdx, scheduledPods)
if err != nil {
return err
}
} else {
// TODO(macsko): Migrate to setClusterStatePodsParallelized for parallelism == 1
// after making sure the implementation is always correct in CA 1.33.
err := snapshot.setClusterStatePodsSequential(nodeInfos, nodeNameToIdx, scheduledPods)
if err != nil {
return err
}
}

// TODO(DRA): Save DRA snapshot.
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func BenchmarkBuildNodeInfoList(b *testing.B) {
for _, tc := range testCases {
b.Run(fmt.Sprintf("fork add 1000 to %d", tc.nodeCount), func(b *testing.B) {
nodes := clustersnapshot.CreateTestNodes(tc.nodeCount + 1000)
deltaStore := NewDeltaSnapshotStore()
deltaStore := NewDeltaSnapshotStore(16)
if err := deltaStore.SetClusterState(nodes[:tc.nodeCount], nil, drasnapshot.Snapshot{}); err != nil {
assert.NoError(b, err)
}
Expand All @@ -70,7 +70,7 @@ func BenchmarkBuildNodeInfoList(b *testing.B) {
for _, tc := range testCases {
b.Run(fmt.Sprintf("base %d", tc.nodeCount), func(b *testing.B) {
nodes := clustersnapshot.CreateTestNodes(tc.nodeCount)
deltaStore := NewDeltaSnapshotStore()
deltaStore := NewDeltaSnapshotStore(16)
if err := deltaStore.SetClusterState(nodes, nil, drasnapshot.Snapshot{}); err != nil {
assert.NoError(b, err)
}
Expand Down
Loading