diff --git a/pkg/resources/manifests.go b/pkg/resources/manifests.go index 78cd1930f..68c59fc25 100644 --- a/pkg/resources/manifests.go +++ b/pkg/resources/manifests.go @@ -5,6 +5,8 @@ package resources import ( "context" "fmt" + batchv1 "k8s.io/api/batch/v1" + "k8s.io/utils/pointer" "k8s.io/apimachinery/pkg/util/intstr" @@ -182,6 +184,76 @@ func GenerateStatefulSetManifest(ctx context.Context, workspaceObj *kaitov1alpha return ss } +func GenerateTuningJobManifest(ctx context.Context, wObj *kaitov1alpha1.Workspace, imageName string, + imagePullSecretRefs []corev1.LocalObjectReference, replicas int, commands []string, containerPorts []corev1.ContainerPort, + livenessProbe, readinessProbe *corev1.Probe, resourceRequirements corev1.ResourceRequirements, tolerations []corev1.Toleration, + initContainers []corev1.Container, volumes []corev1.Volume, volumeMounts []corev1.VolumeMount) *batchv1.Job { + labels := map[string]string{ + kaitov1alpha1.LabelWorkspaceName: wObj.Name, + } + //TODO: + // Will be included in future PR, this code includes + // bash script for pushing results based on user + // data destination method + //pushMethod, pushArg := determinePushMethod(wObj) + return &batchv1.Job{ + TypeMeta: v1.TypeMeta{ + APIVersion: "batch/v1", + Kind: "Job", + }, + ObjectMeta: v1.ObjectMeta{ + Name: wObj.Name, + Namespace: wObj.Namespace, + Labels: labels, + OwnerReferences: []v1.OwnerReference{ + { + APIVersion: kaitov1alpha1.GroupVersion.String(), + Kind: "Workspace", + Name: wObj.Name, + UID: wObj.UID, + Controller: pointer.BoolPtr(true), + }, + }, + }, + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + ObjectMeta: v1.ObjectMeta{ + Labels: labels, + }, + Spec: corev1.PodSpec{ + InitContainers: initContainers, + Containers: []corev1.Container{ + { + Name: wObj.Name, + Image: imageName, + Command: commands, + Resources: resourceRequirements, + LivenessProbe: livenessProbe, + ReadinessProbe: readinessProbe, + Ports: containerPorts, + VolumeMounts: volumeMounts, + }, + { + Name: "docker-sidecar", + Image: "docker:dind", + SecurityContext: &corev1.SecurityContext{ + Privileged: pointer.BoolPtr(true), + }, + VolumeMounts: volumeMounts, + Command: []string{"/bin/sh", "-c"}, + // TODO: Args: []string{pushMethod(pushArg)}, + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + Volumes: volumes, + Tolerations: tolerations, + ImagePullSecrets: imagePullSecretRefs, + }, + }, + }, + } +} + func GenerateDeploymentManifest(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace, imageName string, imagePullSecretRefs []corev1.LocalObjectReference, replicas int, commands []string, containerPorts []corev1.ContainerPort, livenessProbe, readinessProbe *corev1.Probe, resourceRequirements corev1.ResourceRequirements, diff --git a/pkg/tuning/preset-tuning.go b/pkg/tuning/preset-tuning.go index 960d62270..079a0662e 100644 --- a/pkg/tuning/preset-tuning.go +++ b/pkg/tuning/preset-tuning.go @@ -3,6 +3,7 @@ package tuning import ( "context" "fmt" + "k8s.io/apimachinery/pkg/api/resource" "os" kaitov1alpha1 "github.com/azure/kaito/api/v1alpha1" @@ -15,7 +16,35 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -func GetInstanceGPUCount(sku string) int { +const ( + Port5000 = int32(5000) + TuningFile = "fine_tuning_api.py" +) + +var ( + containerPorts = []corev1.ContainerPort{{ + ContainerPort: Port5000, + }} + + // Come up with valid liveness and readiness probes for fine-tuning + // TODO: livenessProbe = &corev1.Probe{} + // TODO: readinessProbe = &corev1.Probe{} + + tolerations = []corev1.Toleration{ + { + Effect: corev1.TaintEffectNoSchedule, + Operator: corev1.TolerationOpEqual, + Key: resources.GPUString, + }, + { + Effect: corev1.TaintEffectNoSchedule, + Value: resources.GPUString, + Key: "sku", + }, + } +) + +func getInstanceGPUCount(sku string) int { gpuConfig, exists := kaitov1alpha1.SupportedGPUConfigs[sku] if !exists { return 1 @@ -44,12 +73,8 @@ func GetDataDestImageInfo(ctx context.Context, wObj *kaitov1alpha1.Workspace) (s func EnsureTuningConfigMap(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace, tuningObj *model.PresetParam, kubeClient client.Client) error { // Copy Configmap from helm chart configmap into workspace - releaseNamespace, err := utils.GetReleaseNamespace() - if err != nil { - return fmt.Errorf("failed to get release namespace: %v", err) - } existingCM := &corev1.ConfigMap{} - err = resources.GetResource(ctx, workspaceObj.Tuning.ConfigTemplate, workspaceObj.Namespace, kubeClient, existingCM) + err := resources.GetResource(ctx, workspaceObj.Tuning.ConfigTemplate, workspaceObj.Namespace, kubeClient, existingCM) if err != nil { if !errors.IsNotFound(err) { return err @@ -59,6 +84,10 @@ func EnsureTuningConfigMap(ctx context.Context, workspaceObj *kaitov1alpha1.Work return nil } + releaseNamespace, err := utils.GetReleaseNamespace() + if err != nil { + return fmt.Errorf("failed to get release namespace: %v", err) + } templateCM := &corev1.ConfigMap{} err = resources.GetResource(ctx, workspaceObj.Tuning.ConfigTemplate, releaseNamespace, kubeClient, templateCM) if err != nil { @@ -80,6 +109,107 @@ func EnsureTuningConfigMap(ctx context.Context, workspaceObj *kaitov1alpha1.Work func CreatePresetTuning(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace, tuningObj *model.PresetParam, kubeClient client.Client) (client.Object, error) { - // TODO - return nil, nil + initContainers, imagePullSecrets, volumes, volumeMounts, err := prepareDataSource(ctx, workspaceObj, kubeClient) + if err != nil { + return nil, err + } + + err = EnsureTuningConfigMap(ctx, workspaceObj, tuningObj, kubeClient) + if err != nil { + return nil, err + } + + shmVolume, shmVolumeMount := utils.ConfigSHMVolume(*workspaceObj.Resource.Count) + if shmVolume.Name != "" { + volumes = append(volumes, shmVolume) + } + if shmVolumeMount.Name != "" { + volumeMounts = append(volumeMounts, shmVolumeMount) + } + + cmVolume, cmVolumeMount := utils.ConfigCMVolume(workspaceObj.Tuning.ConfigTemplate) + volumes = append(volumes, cmVolume) + volumeMounts = append(volumeMounts, cmVolumeMount) + + modelCommand, err := prepareModelRunParameters(ctx, tuningObj) + if err != nil { + return nil, err + } + commands, resourceReq := prepareTuningParameters(ctx, workspaceObj, modelCommand, tuningObj) + tuningImage := GetTuningImageInfo(ctx, workspaceObj, tuningObj) + + jobObj := resources.GenerateTuningJobManifest(ctx, workspaceObj, tuningImage, imagePullSecrets, *workspaceObj.Resource.Count, commands, + containerPorts, nil, nil, resourceReq, tolerations, initContainers, volumes, volumeMounts) + + err = resources.CreateResource(ctx, jobObj, kubeClient) + if client.IgnoreAlreadyExists(err) != nil { + return nil, err + } + return jobObj, nil +} + +// Now there are three options for DataSource: 1. URL - 2. HostPath - 3. Image +func prepareDataSource(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace, kubeClient client.Client) ([]corev1.Container, []corev1.LocalObjectReference, []corev1.Volume, []corev1.VolumeMount, error) { + var initContainers []corev1.Container + var volumes []corev1.Volume + var volumeMounts []corev1.VolumeMount + var imagePullSecrets []corev1.LocalObjectReference + switch { + case workspaceObj.Tuning.Input.Image != "": + initContainers, volumes, volumeMounts = handleImageDataSource(ctx, workspaceObj) + _, imagePullSecrets = GetDataSrcImageInfo(ctx, workspaceObj) + // TODO: Future PR include + // case len(workspaceObj.Tuning.Input.URLs) > 0: + // case workspaceObj.Tuning.Input.Volume != nil: + } + return initContainers, imagePullSecrets, volumes, volumeMounts, nil +} + +func handleImageDataSource(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace) ([]corev1.Container, []corev1.Volume, []corev1.VolumeMount) { + var initContainers []corev1.Container + // Constructing a multistep command that lists, copies, and then lists the destination + command := "ls -la /data && cp -r /data/* " + utils.DefaultDataVolumePath + " && ls -la " + utils.DefaultDataVolumePath + initContainers = append(initContainers, corev1.Container{ + Name: "data-extractor", + Image: workspaceObj.Tuning.Input.Image, + Command: []string{"sh", "-c", command}, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "data-volume", + MountPath: utils.DefaultDataVolumePath, + }, + }, + }) + + volumes, volumeMounts := utils.ConfigDataVolume("") + return initContainers, volumes, volumeMounts +} + +func prepareModelRunParameters(ctx context.Context, tuningObj *model.PresetParam) (string, error) { + modelCommand := utils.BuildCmdStr(TuningFile, tuningObj.ModelRunParams) + return modelCommand, nil +} + +// prepareTuningParameters builds a PyTorch command: +// accelerate launch baseCommand +// and sets the GPU resources required for tuning. +// Returns the command and resource configuration. +func prepareTuningParameters(ctx context.Context, wObj *kaitov1alpha1.Workspace, modelCommand string, tuningObj *model.PresetParam) ([]string, corev1.ResourceRequirements) { + // Set # of processes to GPU Count + numProcesses := getInstanceGPUCount(wObj.Resource.InstanceType) + tuningObj.TorchRunParams["num_processes"] = fmt.Sprintf("%d", numProcesses) + torchCommand := utils.BuildCmdStr(tuningObj.BaseCommand, tuningObj.TorchRunParams) + torchCommand = utils.BuildCmdStr(torchCommand, tuningObj.TorchRunRdzvParams) + commands := utils.ShellCmd(torchCommand + " " + modelCommand) + + resourceRequirements := corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceName(resources.CapacityNvidiaGPU): resource.MustParse(tuningObj.GPUCountRequirement), + }, + Limits: corev1.ResourceList{ + corev1.ResourceName(resources.CapacityNvidiaGPU): resource.MustParse(tuningObj.GPUCountRequirement), + }, + } + + return commands, resourceRequirements } diff --git a/pkg/tuning/preset-tuning_test.go b/pkg/tuning/preset-tuning_test.go new file mode 100644 index 000000000..13a983210 --- /dev/null +++ b/pkg/tuning/preset-tuning_test.go @@ -0,0 +1,342 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package tuning + +import ( + "context" + kaitov1alpha1 "github.com/azure/kaito/api/v1alpha1" + "github.com/azure/kaito/pkg/model" + "github.com/azure/kaito/pkg/utils/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/utils/pointer" + "os" + "testing" +) + +// Mocking the SupportedGPUConfigs to be used in test scenarios. +var mockSupportedGPUConfigs = map[string]kaitov1alpha1.GPUConfig{ + "sku1": {GPUCount: 2}, + "sku2": {GPUCount: 4}, + "sku3": {GPUCount: 0}, +} + +func TestGetInstanceGPUCount(t *testing.T) { + kaitov1alpha1.SupportedGPUConfigs = mockSupportedGPUConfigs + testcases := map[string]struct { + sku string + expectedGPUCount int + }{ + "SKU Exists With Multiple GPUs": { + sku: "sku1", + expectedGPUCount: 2, + }, + "SKU Exists With Zero GPUs": { + sku: "sku3", + expectedGPUCount: 0, + }, + "SKU Does Not Exist": { + sku: "sku_unknown", + expectedGPUCount: 1, + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + result := getInstanceGPUCount(tc.sku) + assert.Equal(t, tc.expectedGPUCount, result) + }) + } +} + +func TestGetTuningImageInfo(t *testing.T) { + // Setting up test environment + originalRegistryName := os.Getenv("PRESET_REGISTRY_NAME") + defer func() { + os.Setenv("PRESET_REGISTRY_NAME", originalRegistryName) // Reset after tests + }() + + testcases := map[string]struct { + registryName string + wObj *kaitov1alpha1.Workspace + presetObj *model.PresetParam + expected string + }{ + "Valid Registry and Parameters": { + registryName: "testregistry", + wObj: &kaitov1alpha1.Workspace{ + Tuning: &kaitov1alpha1.TuningSpec{ + Preset: &kaitov1alpha1.PresetSpec{ + PresetMeta: kaitov1alpha1.PresetMeta{ + Name: "testpreset", + }, + }, + }, + }, + presetObj: &model.PresetParam{ + Tag: "latest", + }, + expected: "testregistry/kaito-tuning-testpreset:latest", + }, + "Empty Registry Name": { + registryName: "", + wObj: &kaitov1alpha1.Workspace{ + Tuning: &kaitov1alpha1.TuningSpec{ + Preset: &kaitov1alpha1.PresetSpec{ + PresetMeta: kaitov1alpha1.PresetMeta{ + Name: "testpreset", + }, + }, + }, + }, + presetObj: &model.PresetParam{ + Tag: "latest", + }, + expected: "/kaito-tuning-testpreset:latest", + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + os.Setenv("PRESET_REGISTRY_NAME", tc.registryName) + result := GetTuningImageInfo(context.Background(), tc.wObj, tc.presetObj) + assert.Equal(t, tc.expected, result) + }) + } +} + +func TestGetDataSrcImageInfo(t *testing.T) { + testcases := map[string]struct { + wObj *kaitov1alpha1.Workspace + expectedImage string + expectedSecrets []corev1.LocalObjectReference + }{ + "Multiple Image Pull Secrets": { + wObj: &kaitov1alpha1.Workspace{ + Tuning: &kaitov1alpha1.TuningSpec{ + Input: &kaitov1alpha1.DataSource{ + Image: "kaito/data-source", + ImagePullSecrets: []string{"secret1", "secret2"}, + }, + }, + }, + expectedImage: "kaito/data-source", + expectedSecrets: []corev1.LocalObjectReference{ + {Name: "secret1"}, + {Name: "secret2"}, + }, + }, + "No Image Pull Secrets": { + wObj: &kaitov1alpha1.Workspace{ + Tuning: &kaitov1alpha1.TuningSpec{ + Input: &kaitov1alpha1.DataSource{ + Image: "kaito/data-source", + }, + }, + }, + expectedImage: "kaito/data-source", + expectedSecrets: []corev1.LocalObjectReference{}, + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + resultImage, resultSecrets := GetDataSrcImageInfo(context.Background(), tc.wObj) + assert.Equal(t, tc.expectedImage, resultImage) + assert.Equal(t, tc.expectedSecrets, resultSecrets) + }) + } +} + +func TestEnsureTuningConfigMap(t *testing.T) { + testcases := map[string]struct { + callMocks func(c *test.MockClient) + workspaceObj *kaitov1alpha1.Workspace + expectedError string + }{ + "Config already exists in workspace namespace": { + callMocks: func(c *test.MockClient) { + os.Setenv("RELEASE_NAMESPACE", "release-namespace") + c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&corev1.ConfigMap{}), mock.Anything).Return(nil) + }, + workspaceObj: &kaitov1alpha1.Workspace{ + Tuning: &kaitov1alpha1.TuningSpec{ + ConfigTemplate: "config-template", + }, + }, + expectedError: "", + }, + "Error finding release namespace": { + callMocks: func(c *test.MockClient) { + c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&corev1.ConfigMap{}), mock.Anything).Return(errors.NewNotFound(schema.GroupResource{}, "config-template")) + }, + workspaceObj: &kaitov1alpha1.Workspace{ + Tuning: &kaitov1alpha1.TuningSpec{ + ConfigTemplate: "config-template", + }, + }, + expectedError: "failed to get ConfigMap from template namespace: \"config-template\" not found", + }, + "Config doesn't exist in template namespace": { + callMocks: func(c *test.MockClient) { + os.Setenv("RELEASE_NAMESPACE", "release-namespace") + c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&corev1.ConfigMap{}), mock.Anything).Return(errors.NewNotFound(schema.GroupResource{}, "config-template")) + }, + workspaceObj: &kaitov1alpha1.Workspace{ + Tuning: &kaitov1alpha1.TuningSpec{ + ConfigTemplate: "config-template", + }, + }, + expectedError: "failed to get ConfigMap from template namespace: \"config-template\" not found", + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + mockClient := test.NewClient() + tc.callMocks(mockClient) + tc.workspaceObj.SetNamespace("workspace-namespace") + err := EnsureTuningConfigMap(context.Background(), tc.workspaceObj, nil, mockClient) + if tc.expectedError != "" { + assert.EqualError(t, err, tc.expectedError) + } else { + assert.NoError(t, err) + } + mockClient.AssertExpectations(t) + }) + } +} + +func TestHandleImageDataSource(t *testing.T) { + testcases := map[string]struct { + workspaceObj *kaitov1alpha1.Workspace + expectedInitContainerName string + expectedVolumeName string + expectedVolumeMountPath string + }{ + "Handle Image Data Source": { + workspaceObj: &kaitov1alpha1.Workspace{ + Resource: kaitov1alpha1.ResourceSpec{ + Count: pointer.Int(1), + }, + Tuning: &kaitov1alpha1.TuningSpec{ + Input: &kaitov1alpha1.DataSource{ + Image: "data-image", + }, + }, + }, + expectedInitContainerName: "data-extractor", + expectedVolumeName: "data-volume", + expectedVolumeMountPath: "/mnt/data", + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + initContainers, volumes, volumeMounts := handleImageDataSource(context.Background(), tc.workspaceObj) + + assert.Len(t, initContainers, 1) + assert.Equal(t, tc.expectedInitContainerName, initContainers[0].Name) + assert.Equal(t, tc.workspaceObj.Tuning.Input.Image, initContainers[0].Image) + assert.Contains(t, initContainers[0].Command[2], "cp -r /data/* /mnt/data") + + assert.Len(t, volumes, 1) + assert.Equal(t, tc.expectedVolumeName, volumes[0].Name) + + assert.Len(t, volumeMounts, 1) + assert.Equal(t, tc.expectedVolumeMountPath, volumeMounts[0].MountPath) + }) + } +} + +func TestPrepareTuningParameters(t *testing.T) { + ctx := context.TODO() + + testcases := map[string]struct { + name string + workspaceObj *kaitov1alpha1.Workspace + modelCommand string + tuningObj *model.PresetParam + expectedCommands []string + expectedRequirements corev1.ResourceRequirements + }{ + "Basic Tuning Parameters Setup": { + workspaceObj: &kaitov1alpha1.Workspace{ + Resource: kaitov1alpha1.ResourceSpec{ + InstanceType: "gpu-instance-type", + }, + }, + modelCommand: "model-command", + tuningObj: &model.PresetParam{ + BaseCommand: "python train.py", + TorchRunParams: map[string]string{}, + TorchRunRdzvParams: map[string]string{}, + GPUCountRequirement: "2", + }, + expectedCommands: []string{"/bin/sh", "-c", "python train.py --num_processes=1 model-command"}, + expectedRequirements: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceName("nvidia.com/gpu"): resource.MustParse("2"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceName("nvidia.com/gpu"): resource.MustParse("2"), + }, + }, + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + commands, resources := prepareTuningParameters(ctx, tc.workspaceObj, tc.modelCommand, tc.tuningObj) + assert.Equal(t, tc.expectedCommands, commands) + assert.Equal(t, tc.expectedRequirements.Requests, resources.Requests) + assert.Equal(t, tc.expectedRequirements.Limits, resources.Limits) + }) + } +} + +func TestPrepareDataSource_ImageSource(t *testing.T) { + ctx := context.TODO() + + workspaceObj := &kaitov1alpha1.Workspace{ + Tuning: &kaitov1alpha1.TuningSpec{ + Input: &kaitov1alpha1.DataSource{ + Image: "custom/data-loader-image", + }, + }, + } + + // Expected outputs from mocked functions + expectedVolumes := []corev1.Volume{ + { + Name: "data-volume", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, // Assume we expect an EmptyDir + }, + }, + } + expectedVolumeMounts := []corev1.VolumeMount{{Name: "data-volume", MountPath: "/mnt/data"}} + expectedImagePullSecrets := []corev1.LocalObjectReference{} + expectedInitContainers := []corev1.Container{ + { + Name: "data-extractor", + Image: "custom/data-loader-image", + Command: []string{"sh", "-c", "ls -la /data && cp -r /data/* /mnt/data && ls -la /mnt/data"}, + VolumeMounts: expectedVolumeMounts, + }, + } + + initContainers, imagePullSecrets, volumes, volumeMounts, err := prepareDataSource(ctx, workspaceObj, nil) + + // Assertions + assert.NoError(t, err) + assert.Equal(t, expectedInitContainers, initContainers) + assert.Equal(t, expectedVolumes, volumes) + assert.Equal(t, expectedVolumeMounts, volumeMounts) + assert.Equal(t, expectedImagePullSecrets, imagePullSecrets) +} diff --git a/pkg/utils/common-preset.go b/pkg/utils/common-preset.go index 17b749853..4bfeaff7f 100644 --- a/pkg/utils/common-preset.go +++ b/pkg/utils/common-preset.go @@ -9,7 +9,7 @@ import ( const ( DefaultVolumeMountPath = "/dev/shm" DefaultConfigMapMountPath = "/config" - DefaultDataVolumePath = "/data" + DefaultDataVolumePath = "/mnt/data" ) func ConfigSHMVolume(instanceCount int) (corev1.Volume, corev1.VolumeMount) {