From 60477f70673d5eced4d846de7743c90ccf8eb4a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Skocze=C5=84?= Date: Thu, 19 Dec 2024 10:06:57 +0000 Subject: [PATCH] Parallelize cluster snapshot creation --- .../config/autoscaling_options.go | 2 + .../filter_out_schedulable_test.go | 2 +- cluster-autoscaler/main.go | 4 +- .../pod_injection_processor_test.go | 2 +- .../predicate/predicate_snapshot_test.go | 2 +- .../simulator/clustersnapshot/store/delta.go | 102 +++++++++++++++--- .../store/delta_benchmark_test.go | 4 +- 7 files changed, 97 insertions(+), 21 deletions(-) diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index ed4a8b645801..c5564ed3a5f3 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -311,6 +311,8 @@ type AutoscalingOptions struct { ForceDeleteLongUnregisteredNodes bool // DynamicResourceAllocationEnabled configures whether logic for handling DRA objects is enabled. DynamicResourceAllocationEnabled bool + // ClusterSnapshotParallelization is the maximum parallelization of cluster snapshot creation. + ClusterSnapshotParallelization int } // KubeClientOptions specify options for kube client diff --git a/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go b/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go index c8510cb0ac17..61303ed3a8d1 100644 --- a/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go +++ b/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go @@ -254,7 +254,7 @@ func BenchmarkFilterOutSchedulable(b *testing.B) { return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewBasicSnapshotStore()) }, "delta": func() clustersnapshot.ClusterSnapshot { - return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewDeltaSnapshotStore()) + return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewDeltaSnapshotStore(16)) }, } for snapshotName, snapshotFactory := range snapshots { diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index fefe90f992c6..190c72cb9dfa 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -281,6 +281,7 @@ var ( checkCapacityProvisioningRequestBatchTimebox = flag.Duration("check-capacity-provisioning-request-batch-timebox", 10*time.Second, "Maximum time to process a batch of provisioning requests.") forceDeleteLongUnregisteredNodes = flag.Bool("force-delete-unregistered-nodes", false, "Whether to enable force deletion of long unregistered nodes, regardless of the min size of the node group the belong to.") enableDynamicResourceAllocation = flag.Bool("enable-dynamic-resource-allocation", false, "Whether logic for handling DRA (Dynamic Resource Allocation) objects is enabled.") + clusterSnapshotParallelization = flag.Int("cluster-snapshot-parallelization", 16, "Maximum parallelization of cluster snapshot creation.") ) func isFlagPassed(name string) bool { @@ -461,6 +462,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { CheckCapacityProvisioningRequestBatchTimebox: *checkCapacityProvisioningRequestBatchTimebox, ForceDeleteLongUnregisteredNodes: *forceDeleteLongUnregisteredNodes, DynamicResourceAllocationEnabled: *enableDynamicResourceAllocation, + ClusterSnapshotParallelization: *clusterSnapshotParallelization, } } @@ -506,7 +508,7 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot opts := core.AutoscalerOptions{ AutoscalingOptions: autoscalingOptions, FrameworkHandle: fwHandle, - ClusterSnapshot: predicate.NewPredicateSnapshot(store.NewDeltaSnapshotStore(), fwHandle, autoscalingOptions.DynamicResourceAllocationEnabled), + ClusterSnapshot: predicate.NewPredicateSnapshot(store.NewDeltaSnapshotStore(autoscalingOptions.ClusterSnapshotParallelization), fwHandle, autoscalingOptions.DynamicResourceAllocationEnabled), KubeClient: kubeClient, InformerFactory: informerFactory, DebuggingSnapshotter: debuggingSnapshotter, diff --git a/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go b/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go index 5ef275fd3dcf..4ce6b895b15f 100644 --- a/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go +++ b/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go @@ -114,7 +114,7 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { p := NewPodInjectionPodListProcessor(podinjectionbackoff.NewFakePodControllerRegistry()) - clusterSnapshot := testsnapshot.NewCustomTestSnapshotOrDie(t, store.NewDeltaSnapshotStore()) + clusterSnapshot := testsnapshot.NewCustomTestSnapshotOrDie(t, store.NewDeltaSnapshotStore(16)) err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.scheduledPods...)) assert.NoError(t, err) ctx := context.AutoscalingContext{ diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot_test.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot_test.go index ba975cafceac..f9df2eacad33 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot_test.go @@ -46,7 +46,7 @@ var snapshots = map[string]func() (clustersnapshot.ClusterSnapshot, error){ if err != nil { return nil, err } - return NewPredicateSnapshot(store.NewDeltaSnapshotStore(), fwHandle, true), nil + return NewPredicateSnapshot(store.NewDeltaSnapshotStore(16), fwHandle, true), nil }, } diff --git a/cluster-autoscaler/simulator/clustersnapshot/store/delta.go b/cluster-autoscaler/simulator/clustersnapshot/store/delta.go index 40154047491a..2b9a925ab0b7 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/store/delta.go +++ b/cluster-autoscaler/simulator/clustersnapshot/store/delta.go @@ -17,11 +17,13 @@ limitations under the License. package store import ( + "context" "fmt" apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -44,7 +46,8 @@ import ( // pod affinity - causes scheduler framework to list pods with non-empty selector, // so basic caching doesn't help. type DeltaSnapshotStore struct { - data *internalDeltaSnapshotData + data *internalDeltaSnapshotData + parallelization int } type deltaSnapshotStoreNodeLister DeltaSnapshotStore @@ -137,10 +140,14 @@ func (data *internalDeltaSnapshotData) buildNodeInfoList() []*schedulerframework return nodeInfoList } -func (data *internalDeltaSnapshotData) addNode(node *apiv1.Node) error { +func (data *internalDeltaSnapshotData) addNode(node *apiv1.Node) (*schedulerframework.NodeInfo, error) { nodeInfo := schedulerframework.NewNodeInfo() nodeInfo.SetNode(node) - return data.addNodeInfo(nodeInfo) + err := data.addNodeInfo(nodeInfo) + if err != nil { + return nil, err + } + return nodeInfo, nil } func (data *internalDeltaSnapshotData) addNodeInfo(nodeInfo *schedulerframework.NodeInfo) error { @@ -241,6 +248,24 @@ func (data *internalDeltaSnapshotData) addPod(pod *apiv1.Pod, nodeName string) e return nil } +func (data *internalDeltaSnapshotData) addPodToNode(pod *apiv1.Pod, ni *schedulerframework.NodeInfo) error { + ni.AddPod(pod) + + // Maybe consider deleting from the list in the future. Maybe not. + data.clearCaches() + return nil +} + +func (data *internalDeltaSnapshotData) addPodsToNode(pods []*apiv1.Pod, ni *schedulerframework.NodeInfo) error { + for _, pod := range pods { + ni.AddPod(pod) + } + + // Maybe consider deleting from the list in the future. Maybe not. + data.clearCaches() + return nil +} + func (data *internalDeltaSnapshotData) removePod(namespace, name, nodeName string) error { // This always clones node info, even if the pod is actually missing. // Not sure if we mind, since removing non-existent pod @@ -403,8 +428,10 @@ func (snapshot *DeltaSnapshotStore) DeviceClasses() schedulerframework.DeviceCla } // NewDeltaSnapshotStore creates instances of DeltaSnapshotStore. -func NewDeltaSnapshotStore() *DeltaSnapshotStore { - snapshot := &DeltaSnapshotStore{} +func NewDeltaSnapshotStore(parallelization int) *DeltaSnapshotStore { + snapshot := &DeltaSnapshotStore{ + parallelization: parallelization, + } snapshot.clear() return snapshot } @@ -417,7 +444,7 @@ func (snapshot *DeltaSnapshotStore) DraSnapshot() drasnapshot.Snapshot { // AddSchedulerNodeInfo adds a NodeInfo. func (snapshot *DeltaSnapshotStore) AddSchedulerNodeInfo(nodeInfo *schedulerframework.NodeInfo) error { - if err := snapshot.data.addNode(nodeInfo.Node()); err != nil { + if _, err := snapshot.data.addNode(nodeInfo.Node()); err != nil { return err } for _, podInfo := range nodeInfo.Pods { @@ -428,24 +455,69 @@ func (snapshot *DeltaSnapshotStore) AddSchedulerNodeInfo(nodeInfo *schedulerfram return nil } +// setClusterStatePodsSequential sets the pods in cluster state in a sequential way. +func (snapshot *DeltaSnapshotStore) setClusterStatePodsSequential(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) error { + for _, pod := range scheduledPods { + if nodeIdx, ok := nodeNameToIdx[pod.Spec.NodeName]; ok { + if err := snapshot.data.addPodToNode(pod, nodeInfos[nodeIdx]); err != nil { + return err + } + } + } + return nil +} + +// setClusterStatePodsParallelized sets the pods in cluster state in parallel based on snapshot.parallelization value. +func (snapshot *DeltaSnapshotStore) setClusterStatePodsParallelized(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) error { + podsForNode := make([][]*apiv1.Pod, len(nodeInfos)) + for _, pod := range scheduledPods { + nodeIdx, ok := nodeNameToIdx[pod.Spec.NodeName] + if !ok { + continue + } + podsForNode[nodeIdx] = append(podsForNode[nodeIdx], pod) + } + + ctx := context.Background() + ctx, cancel := context.WithCancelCause(ctx) + + workqueue.ParallelizeUntil(ctx, snapshot.parallelization, len(nodeInfos), func(nodeIdx int) { + err := snapshot.data.addPodsToNode(podsForNode[nodeIdx], nodeInfos[nodeIdx]) + if err != nil { + cancel(err) + } + }) + + return context.Cause(ctx) +} + // SetClusterState sets the cluster state. func (snapshot *DeltaSnapshotStore) SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod, draSnapshot drasnapshot.Snapshot) error { snapshot.clear() - knownNodes := make(map[string]bool) - for _, node := range nodes { - if err := snapshot.data.addNode(node); err != nil { + nodeNameToIdx := make(map[string]int, len(nodes)) + nodeInfos := make([]*schedulerframework.NodeInfo, len(nodes)) + for i, node := range nodes { + nodeInfo, err := snapshot.data.addNode(node) + if err != nil { return err } - knownNodes[node.Name] = true + nodeNameToIdx[node.Name] = i + nodeInfos[i] = nodeInfo } - for _, pod := range scheduledPods { - if knownNodes[pod.Spec.NodeName] { - if err := snapshot.data.addPod(pod, pod.Spec.NodeName); err != nil { - return err - } + + if snapshot.parallelization > 1 { + err := snapshot.setClusterStatePodsParallelized(nodeInfos, nodeNameToIdx, scheduledPods) + if err != nil { + return err + } + } else { + err := snapshot.setClusterStatePodsSequential(nodeInfos, nodeNameToIdx, scheduledPods) + if err != nil { + return err } } + // TODO(DRA): Save DRA snapshot. return nil } diff --git a/cluster-autoscaler/simulator/clustersnapshot/store/delta_benchmark_test.go b/cluster-autoscaler/simulator/clustersnapshot/store/delta_benchmark_test.go index c10776cc9de8..5f618befd180 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/store/delta_benchmark_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/store/delta_benchmark_test.go @@ -48,7 +48,7 @@ func BenchmarkBuildNodeInfoList(b *testing.B) { for _, tc := range testCases { b.Run(fmt.Sprintf("fork add 1000 to %d", tc.nodeCount), func(b *testing.B) { nodes := clustersnapshot.CreateTestNodes(tc.nodeCount + 1000) - deltaStore := NewDeltaSnapshotStore() + deltaStore := NewDeltaSnapshotStore(16) if err := deltaStore.SetClusterState(nodes[:tc.nodeCount], nil, drasnapshot.Snapshot{}); err != nil { assert.NoError(b, err) } @@ -70,7 +70,7 @@ func BenchmarkBuildNodeInfoList(b *testing.B) { for _, tc := range testCases { b.Run(fmt.Sprintf("base %d", tc.nodeCount), func(b *testing.B) { nodes := clustersnapshot.CreateTestNodes(tc.nodeCount) - deltaStore := NewDeltaSnapshotStore() + deltaStore := NewDeltaSnapshotStore(16) if err := deltaStore.SetClusterState(nodes, nil, drasnapshot.Snapshot{}); err != nil { assert.NoError(b, err) }