Skip to content

Commit

Permalink
feat: Add tuning job manifest, image source creation, parameter setup…
Browse files Browse the repository at this point in the history
… - Part 8 (#363)

**Reason for Change**:
This PR includes the tuning job manifest, along with the init container
for setting up the dataset, and the cmd line params needed.
  • Loading branch information
ishaansehgal99 authored Apr 28, 2024
1 parent 2d5d1b4 commit 962e3d6
Show file tree
Hide file tree
Showing 4 changed files with 553 additions and 9 deletions.
72 changes: 72 additions & 0 deletions pkg/resources/manifests.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package resources
import (
"context"
"fmt"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/utils/pointer"

"k8s.io/apimachinery/pkg/util/intstr"

Expand Down Expand Up @@ -182,6 +184,76 @@ func GenerateStatefulSetManifest(ctx context.Context, workspaceObj *kaitov1alpha
return ss
}

func GenerateTuningJobManifest(ctx context.Context, wObj *kaitov1alpha1.Workspace, imageName string,
imagePullSecretRefs []corev1.LocalObjectReference, replicas int, commands []string, containerPorts []corev1.ContainerPort,
livenessProbe, readinessProbe *corev1.Probe, resourceRequirements corev1.ResourceRequirements, tolerations []corev1.Toleration,
initContainers []corev1.Container, volumes []corev1.Volume, volumeMounts []corev1.VolumeMount) *batchv1.Job {
labels := map[string]string{
kaitov1alpha1.LabelWorkspaceName: wObj.Name,
}
//TODO:
// Will be included in future PR, this code includes
// bash script for pushing results based on user
// data destination method
//pushMethod, pushArg := determinePushMethod(wObj)
return &batchv1.Job{
TypeMeta: v1.TypeMeta{
APIVersion: "batch/v1",
Kind: "Job",
},
ObjectMeta: v1.ObjectMeta{
Name: wObj.Name,
Namespace: wObj.Namespace,
Labels: labels,
OwnerReferences: []v1.OwnerReference{
{
APIVersion: kaitov1alpha1.GroupVersion.String(),
Kind: "Workspace",
Name: wObj.Name,
UID: wObj.UID,
Controller: pointer.BoolPtr(true),
},
},
},
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
ObjectMeta: v1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
InitContainers: initContainers,
Containers: []corev1.Container{
{
Name: wObj.Name,
Image: imageName,
Command: commands,
Resources: resourceRequirements,
LivenessProbe: livenessProbe,
ReadinessProbe: readinessProbe,
Ports: containerPorts,
VolumeMounts: volumeMounts,
},
{
Name: "docker-sidecar",
Image: "docker:dind",
SecurityContext: &corev1.SecurityContext{
Privileged: pointer.BoolPtr(true),
},
VolumeMounts: volumeMounts,
Command: []string{"/bin/sh", "-c"},
// TODO: Args: []string{pushMethod(pushArg)},
},
},
RestartPolicy: corev1.RestartPolicyNever,
Volumes: volumes,
Tolerations: tolerations,
ImagePullSecrets: imagePullSecretRefs,
},
},
},
}
}

func GenerateDeploymentManifest(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace, imageName string,
imagePullSecretRefs []corev1.LocalObjectReference, replicas int, commands []string, containerPorts []corev1.ContainerPort,
livenessProbe, readinessProbe *corev1.Probe, resourceRequirements corev1.ResourceRequirements,
Expand Down
146 changes: 138 additions & 8 deletions pkg/tuning/preset-tuning.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tuning
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/api/resource"
"os"

kaitov1alpha1 "github.com/azure/kaito/api/v1alpha1"
Expand All @@ -15,7 +16,35 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

func GetInstanceGPUCount(sku string) int {
const (
Port5000 = int32(5000)
TuningFile = "fine_tuning_api.py"
)

var (
containerPorts = []corev1.ContainerPort{{
ContainerPort: Port5000,
}}

// Come up with valid liveness and readiness probes for fine-tuning
// TODO: livenessProbe = &corev1.Probe{}
// TODO: readinessProbe = &corev1.Probe{}

tolerations = []corev1.Toleration{
{
Effect: corev1.TaintEffectNoSchedule,
Operator: corev1.TolerationOpEqual,
Key: resources.GPUString,
},
{
Effect: corev1.TaintEffectNoSchedule,
Value: resources.GPUString,
Key: "sku",
},
}
)

func getInstanceGPUCount(sku string) int {
gpuConfig, exists := kaitov1alpha1.SupportedGPUConfigs[sku]
if !exists {
return 1
Expand Down Expand Up @@ -44,12 +73,8 @@ func GetDataDestImageInfo(ctx context.Context, wObj *kaitov1alpha1.Workspace) (s
func EnsureTuningConfigMap(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace,
tuningObj *model.PresetParam, kubeClient client.Client) error {
// Copy Configmap from helm chart configmap into workspace
releaseNamespace, err := utils.GetReleaseNamespace()
if err != nil {
return fmt.Errorf("failed to get release namespace: %v", err)
}
existingCM := &corev1.ConfigMap{}
err = resources.GetResource(ctx, workspaceObj.Tuning.ConfigTemplate, workspaceObj.Namespace, kubeClient, existingCM)
err := resources.GetResource(ctx, workspaceObj.Tuning.ConfigTemplate, workspaceObj.Namespace, kubeClient, existingCM)
if err != nil {
if !errors.IsNotFound(err) {
return err
Expand All @@ -59,6 +84,10 @@ func EnsureTuningConfigMap(ctx context.Context, workspaceObj *kaitov1alpha1.Work
return nil
}

releaseNamespace, err := utils.GetReleaseNamespace()
if err != nil {
return fmt.Errorf("failed to get release namespace: %v", err)
}
templateCM := &corev1.ConfigMap{}
err = resources.GetResource(ctx, workspaceObj.Tuning.ConfigTemplate, releaseNamespace, kubeClient, templateCM)
if err != nil {
Expand All @@ -80,6 +109,107 @@ func EnsureTuningConfigMap(ctx context.Context, workspaceObj *kaitov1alpha1.Work

func CreatePresetTuning(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace,
tuningObj *model.PresetParam, kubeClient client.Client) (client.Object, error) {
// TODO
return nil, nil
initContainers, imagePullSecrets, volumes, volumeMounts, err := prepareDataSource(ctx, workspaceObj, kubeClient)
if err != nil {
return nil, err
}

err = EnsureTuningConfigMap(ctx, workspaceObj, tuningObj, kubeClient)
if err != nil {
return nil, err
}

shmVolume, shmVolumeMount := utils.ConfigSHMVolume(*workspaceObj.Resource.Count)
if shmVolume.Name != "" {
volumes = append(volumes, shmVolume)
}
if shmVolumeMount.Name != "" {
volumeMounts = append(volumeMounts, shmVolumeMount)
}

cmVolume, cmVolumeMount := utils.ConfigCMVolume(workspaceObj.Tuning.ConfigTemplate)
volumes = append(volumes, cmVolume)
volumeMounts = append(volumeMounts, cmVolumeMount)

modelCommand, err := prepareModelRunParameters(ctx, tuningObj)
if err != nil {
return nil, err
}
commands, resourceReq := prepareTuningParameters(ctx, workspaceObj, modelCommand, tuningObj)
tuningImage := GetTuningImageInfo(ctx, workspaceObj, tuningObj)

jobObj := resources.GenerateTuningJobManifest(ctx, workspaceObj, tuningImage, imagePullSecrets, *workspaceObj.Resource.Count, commands,
containerPorts, nil, nil, resourceReq, tolerations, initContainers, volumes, volumeMounts)

err = resources.CreateResource(ctx, jobObj, kubeClient)
if client.IgnoreAlreadyExists(err) != nil {
return nil, err
}
return jobObj, nil
}

// Now there are three options for DataSource: 1. URL - 2. HostPath - 3. Image
func prepareDataSource(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace, kubeClient client.Client) ([]corev1.Container, []corev1.LocalObjectReference, []corev1.Volume, []corev1.VolumeMount, error) {
var initContainers []corev1.Container
var volumes []corev1.Volume
var volumeMounts []corev1.VolumeMount
var imagePullSecrets []corev1.LocalObjectReference
switch {
case workspaceObj.Tuning.Input.Image != "":
initContainers, volumes, volumeMounts = handleImageDataSource(ctx, workspaceObj)
_, imagePullSecrets = GetDataSrcImageInfo(ctx, workspaceObj)
// TODO: Future PR include
// case len(workspaceObj.Tuning.Input.URLs) > 0:
// case workspaceObj.Tuning.Input.Volume != nil:
}
return initContainers, imagePullSecrets, volumes, volumeMounts, nil
}

func handleImageDataSource(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace) ([]corev1.Container, []corev1.Volume, []corev1.VolumeMount) {
var initContainers []corev1.Container
// Constructing a multistep command that lists, copies, and then lists the destination
command := "ls -la /data && cp -r /data/* " + utils.DefaultDataVolumePath + " && ls -la " + utils.DefaultDataVolumePath
initContainers = append(initContainers, corev1.Container{
Name: "data-extractor",
Image: workspaceObj.Tuning.Input.Image,
Command: []string{"sh", "-c", command},
VolumeMounts: []corev1.VolumeMount{
{
Name: "data-volume",
MountPath: utils.DefaultDataVolumePath,
},
},
})

volumes, volumeMounts := utils.ConfigDataVolume("")
return initContainers, volumes, volumeMounts
}

func prepareModelRunParameters(ctx context.Context, tuningObj *model.PresetParam) (string, error) {
modelCommand := utils.BuildCmdStr(TuningFile, tuningObj.ModelRunParams)
return modelCommand, nil
}

// prepareTuningParameters builds a PyTorch command:
// accelerate launch <TORCH_PARAMS> baseCommand <MODEL_PARAMS>
// and sets the GPU resources required for tuning.
// Returns the command and resource configuration.
func prepareTuningParameters(ctx context.Context, wObj *kaitov1alpha1.Workspace, modelCommand string, tuningObj *model.PresetParam) ([]string, corev1.ResourceRequirements) {
// Set # of processes to GPU Count
numProcesses := getInstanceGPUCount(wObj.Resource.InstanceType)
tuningObj.TorchRunParams["num_processes"] = fmt.Sprintf("%d", numProcesses)
torchCommand := utils.BuildCmdStr(tuningObj.BaseCommand, tuningObj.TorchRunParams)
torchCommand = utils.BuildCmdStr(torchCommand, tuningObj.TorchRunRdzvParams)
commands := utils.ShellCmd(torchCommand + " " + modelCommand)

resourceRequirements := corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceName(resources.CapacityNvidiaGPU): resource.MustParse(tuningObj.GPUCountRequirement),
},
Limits: corev1.ResourceList{
corev1.ResourceName(resources.CapacityNvidiaGPU): resource.MustParse(tuningObj.GPUCountRequirement),
},
}

return commands, resourceRequirements
}
Loading

0 comments on commit 962e3d6

Please sign in to comment.