Skip to content

Commit

Permalink
fix: Use SKU GPU Count in DeploymentSpec (#541)
Browse files Browse the repository at this point in the history
**Reason for Change**:
Cases were workspace was not utilizing all available GPUs on machine by
default this was causing slower inference requests and unused resources.
This is the fix.
  • Loading branch information
ishaansehgal99 authored Aug 6, 2024
1 parent 08d2800 commit 7899717
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 16 deletions.
16 changes: 11 additions & 5 deletions pkg/inference/preset-inferences.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,13 @@ func CreatePresetInference(ctx context.Context, workspaceObj *kaitov1alpha1.Work
volumeMounts = append(volumeMounts, adapterVolumeMount)
}

commands, resourceReq := prepareInferenceParameters(ctx, inferenceObj)
skuNumGPUs, err := utils.GetSKUNumGPUs(ctx, kubeClient, workspaceObj.Status.WorkerNodes,
workspaceObj.Resource.InstanceType, inferenceObj.GPUCountRequirement)
if err != nil {
return nil, fmt.Errorf("failed to get SKU num GPUs: %v", err)
}

commands, resourceReq := prepareInferenceParameters(ctx, inferenceObj, skuNumGPUs)
image, imagePullSecrets := GetInferenceImageInfo(ctx, workspaceObj, inferenceObj)

var depObj client.Object
Expand All @@ -149,7 +155,7 @@ func CreatePresetInference(ctx context.Context, workspaceObj *kaitov1alpha1.Work
depObj = resources.GenerateDeploymentManifest(ctx, workspaceObj, image, imagePullSecrets, *workspaceObj.Resource.Count, commands,
containerPorts, livenessProbe, readinessProbe, resourceReq, tolerations, volumes, volumeMounts)
}
err := resources.CreateResource(ctx, depObj, kubeClient)
err = resources.CreateResource(ctx, depObj, kubeClient)
if client.IgnoreAlreadyExists(err) != nil {
return nil, err
}
Expand All @@ -160,18 +166,18 @@ func CreatePresetInference(ctx context.Context, workspaceObj *kaitov1alpha1.Work
// torchrun <TORCH_PARAMS> <OPTIONAL_RDZV_PARAMS> baseCommand <MODEL_PARAMS>
// and sets the GPU resources required for inference.
// Returns the command and resource configuration.
func prepareInferenceParameters(ctx context.Context, inferenceObj *model.PresetParam) ([]string, corev1.ResourceRequirements) {
func prepareInferenceParameters(ctx context.Context, inferenceObj *model.PresetParam, skuNumGPUs string) ([]string, corev1.ResourceRequirements) {
torchCommand := utils.BuildCmdStr(inferenceObj.BaseCommand, inferenceObj.TorchRunParams)
torchCommand = utils.BuildCmdStr(torchCommand, inferenceObj.TorchRunRdzvParams)
modelCommand := utils.BuildCmdStr(InferenceFile, inferenceObj.ModelRunParams)
commands := utils.ShellCmd(torchCommand + " " + modelCommand)

resourceRequirements := corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceName(resources.CapacityNvidiaGPU): resource.MustParse(inferenceObj.GPUCountRequirement),
corev1.ResourceName(resources.CapacityNvidiaGPU): resource.MustParse(skuNumGPUs),
},
Limits: corev1.ResourceList{
corev1.ResourceName(resources.CapacityNvidiaGPU): resource.MustParse(inferenceObj.GPUCountRequirement),
corev1.ResourceName(resources.CapacityNvidiaGPU): resource.MustParse(skuNumGPUs),
},
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/inference/preset-inferences_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package inference

import (
"context"
"github.com/azure/kaito/pkg/utils/consts"
"os"
"reflect"
"strings"
"testing"
Expand Down Expand Up @@ -74,6 +76,7 @@ func TestCreatePresetInference(t *testing.T) {

for k, tc := range testcases {
t.Run(k, func(t *testing.T) {
os.Setenv("CLOUD_PROVIDER", consts.AzureCloudName)
mockClient := test.NewClient()
tc.callMocks(mockClient)

Expand Down
16 changes: 12 additions & 4 deletions pkg/tuning/preset-tuning.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,14 @@ func CreatePresetTuning(ctx context.Context, workspaceObj *kaitov1alpha1.Workspa
if err != nil {
return nil, err
}
commands, resourceReq := prepareTuningParameters(ctx, workspaceObj, modelCommand, tuningObj)

skuNumGPUs, err := utils.GetSKUNumGPUs(ctx, kubeClient, workspaceObj.Status.WorkerNodes,
workspaceObj.Resource.InstanceType, tuningObj.GPUCountRequirement)
if err != nil {
return nil, fmt.Errorf("failed to get SKU num GPUs: %v", err)
}

commands, resourceReq := prepareTuningParameters(ctx, workspaceObj, modelCommand, tuningObj, skuNumGPUs)
tuningImage, tuningImagePullSecrets := GetTuningImageInfo(ctx, workspaceObj, tuningObj)
if tuningImagePullSecrets != nil {
imagePullSecrets = append(imagePullSecrets, tuningImagePullSecrets...)
Expand Down Expand Up @@ -460,7 +467,8 @@ func prepareModelRunParameters(ctx context.Context, tuningObj *model.PresetParam
// accelerate launch <TORCH_PARAMS> baseCommand <MODEL_PARAMS>
// and sets the GPU resources required for tuning.
// Returns the command and resource configuration.
func prepareTuningParameters(ctx context.Context, wObj *kaitov1alpha1.Workspace, modelCommand string, tuningObj *model.PresetParam) ([]string, corev1.ResourceRequirements) {
func prepareTuningParameters(ctx context.Context, wObj *kaitov1alpha1.Workspace, modelCommand string,
tuningObj *model.PresetParam, skuNumGPUs string) ([]string, corev1.ResourceRequirements) {
if tuningObj.TorchRunParams == nil {
tuningObj.TorchRunParams = make(map[string]string)
}
Expand All @@ -473,10 +481,10 @@ func prepareTuningParameters(ctx context.Context, wObj *kaitov1alpha1.Workspace,

resourceRequirements := corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceName(resources.CapacityNvidiaGPU): resource.MustParse(tuningObj.GPUCountRequirement),
corev1.ResourceName(resources.CapacityNvidiaGPU): resource.MustParse(skuNumGPUs),
},
Limits: corev1.ResourceList{
corev1.ResourceName(resources.CapacityNvidiaGPU): resource.MustParse(tuningObj.GPUCountRequirement),
corev1.ResourceName(resources.CapacityNvidiaGPU): resource.MustParse(skuNumGPUs),
},
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/tuning/preset-tuning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func TestPrepareTuningParameters(t *testing.T) {

for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
commands, resources := prepareTuningParameters(ctx, tc.workspaceObj, tc.modelCommand, tc.tuningObj)
commands, resources := prepareTuningParameters(ctx, tc.workspaceObj, tc.modelCommand, tc.tuningObj, "2")
assert.Equal(t, tc.expectedCommands, commands)
assert.Equal(t, tc.expectedRequirements.Requests, resources.Requests)
assert.Equal(t, tc.expectedRequirements.Limits, resources.Limits)
Expand Down
70 changes: 64 additions & 6 deletions pkg/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@
package utils

import (
"context"
"fmt"
"io/ioutil"
"os"

"github.com/azure/kaito/pkg/sku"
"github.com/azure/kaito/pkg/utils/consts"
"gopkg.in/yaml.v2"
"io/ioutil"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"knative.dev/pkg/apis"

"github.com/azure/kaito/pkg/sku"
"github.com/azure/kaito/pkg/utils/consts"
"os"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func Contains(s []string, e string) bool {
Expand Down Expand Up @@ -114,3 +116,59 @@ func GetSKUHandler() (sku.CloudSKUHandler, error) {

return skuHandler, nil
}

func GetSKUNumGPUs(ctx context.Context, kubeClient client.Client, workerNodes []string, instanceType, defaultGPUCount string) (string, error) {
skuHandler, err := GetSKUHandler()
if err != nil {
return "", apis.ErrInvalidValue(fmt.Sprintf("Failed to get SKU handler: %v", err), "sku")
}

skuNumGPUs := defaultGPUCount // Default to using the provided default GPU count

skuConfig, skuExists := skuHandler.GetGPUConfigs()[instanceType]
if skuExists {
skuNumGPUs = fmt.Sprintf("%d", skuConfig.GPUCount)
} else {
skuGPUCount, err := FetchGPUCountFromNodes(ctx, kubeClient, workerNodes)
if err != nil {
fmt.Printf("Failed to fetch GPU count from nodes: %v", err)
} else if skuGPUCount != "" {
skuNumGPUs = skuGPUCount
}
}

return skuNumGPUs, nil
}

// FetchGPUCountFromNodes retrieves the GPU count from the given node names.
func FetchGPUCountFromNodes(ctx context.Context, kubeClient client.Client, nodeNames []string) (string, error) {
if len(nodeNames) == 0 {
return "", fmt.Errorf("no worker nodes found in the workspace")
}

var allNodes v1.NodeList
for _, nodeName := range nodeNames {
nodeList := &v1.NodeList{}
fieldSelector := fields.OneTermEqualSelector("metadata.name", nodeName)
err := kubeClient.List(ctx, nodeList, &client.ListOptions{
FieldSelector: fieldSelector,
})
if err != nil {
fmt.Printf("Failed to list Node object %s: %v\n", nodeName, err)
continue
}
allNodes.Items = append(allNodes.Items, nodeList.Items...)
}

return GetPerNodeGPUCountFromNodes(&allNodes), nil
}

func GetPerNodeGPUCountFromNodes(nodeList *v1.NodeList) string {
for _, node := range nodeList.Items {
gpuCount, exists := node.Status.Capacity[consts.NvidiaGPU]
if exists && gpuCount.String() != "" {
return gpuCount.String()
}
}
return ""
}
119 changes: 119 additions & 0 deletions pkg/utils/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package utils

import (
"context"
"sigs.k8s.io/controller-runtime/pkg/client"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

func TestFetchGPUCountFromNodes(t *testing.T) {
node1 := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node-1",
},
Status: corev1.NodeStatus{
Capacity: corev1.ResourceList{
"nvidia.com/gpu": resource.MustParse("2"),
},
},
}

node2 := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node-2",
},
Status: corev1.NodeStatus{
Capacity: corev1.ResourceList{
"nvidia.com/gpu": resource.MustParse("4"),
},
},
}

node3 := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node-3",
},
Status: corev1.NodeStatus{
Capacity: corev1.ResourceList{},
},
}

tests := []struct {
name string
nodeNames []string
nodes []runtime.Object
expectedGPU string
expectErr bool
expectedErr string
}{
{
name: "Single Node with GPU",
nodeNames: []string{"node-1"},
nodes: []runtime.Object{node1},
expectedGPU: "2",
},
{
name: "Multiple Nodes with GPU",
nodeNames: []string{"node-1", "node-2"},
nodes: []runtime.Object{node1, node2},
expectedGPU: "2",
},
{
name: "Node without GPU",
nodeNames: []string{"node-3"},
nodes: []runtime.Object{node3},
expectedGPU: "",
},
{
name: "No Worker Nodes",
nodeNames: []string{},
nodes: []runtime.Object{},
expectedGPU: "",
expectErr: true,
expectedErr: "no worker nodes found in the workspace",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Set up the fake client with the indexer
s := scheme.Scheme
s.AddKnownTypes(corev1.SchemeGroupVersion, &corev1.Node{}, &corev1.NodeList{})

// Create an indexer function for the "metadata.name" field
indexFunc := func(obj client.Object) []string {
return []string{obj.(*corev1.Node).Name}
}

// Build the fake client with the indexer
kubeClient := fake.NewClientBuilder().
WithScheme(s).
WithRuntimeObjects(tt.nodes...).
WithIndex(&corev1.Node{}, "metadata.name", indexFunc).
Build()

// Call the function
gpuCount, err := FetchGPUCountFromNodes(context.TODO(), kubeClient, tt.nodeNames)

// Check the error
if tt.expectErr {
require.Error(t, err)
assert.Equal(t, tt.expectedErr, err.Error())
} else {
require.NoError(t, err)
}

// Check the GPU count
assert.Equal(t, tt.expectedGPU, gpuCount)
})
}
}
1 change: 1 addition & 0 deletions pkg/utils/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ const (
SKUString = "sku"
MaxRevisionHistoryLimit = 10
GiBToBytes = 1024 * 1024 * 1024 // Conversion factor from GiB to bytes
NvidiaGPU = "nvidia.com/gpu"
)

0 comments on commit 7899717

Please sign in to comment.