From 66bd9bdc0004657a9b3335cf400c56802a172f44 Mon Sep 17 00:00:00 2001 From: Yuki Iwai Date: Wed, 2 Aug 2023 02:42:41 +0900 Subject: [PATCH] Replace Pytorch with PyTorch Signed-off-by: Yuki Iwai --- pkg/apis/kubeflow.org/v1/pytorch_defaults.go | 22 ++++----- pkg/apis/kubeflow.org/v1/pytorch_types.go | 34 +++++++------- .../kubeflow.org/v1/pytorch_validation.go | 4 +- pkg/controller.v1/pytorch/envvar.go | 4 +- pkg/controller.v1/pytorch/hpa.go | 2 +- pkg/controller.v1/pytorch/initcontainer.go | 2 +- .../pytorch/pytorchjob_controller.go | 34 +++++++------- .../pytorch/pytorchjob_controller_test.go | 46 +++++++++---------- pkg/controller.v1/register_controller.go | 2 +- 9 files changed, 75 insertions(+), 75 deletions(-) diff --git a/pkg/apis/kubeflow.org/v1/pytorch_defaults.go b/pkg/apis/kubeflow.org/v1/pytorch_defaults.go index a71625fd4c..1ec2b5f641 100644 --- a/pkg/apis/kubeflow.org/v1/pytorch_defaults.go +++ b/pkg/apis/kubeflow.org/v1/pytorch_defaults.go @@ -23,15 +23,15 @@ var ( DefaultNprocPerNode = "auto" ) -func addPytorchDefaultingFuncs(scheme *runtime.Scheme) error { +func addPyTorchDefaultingFuncs(scheme *runtime.Scheme) error { return RegisterDefaults(scheme) } -// setPytorchDefaultPort sets the default ports for pytorch container. -func setPytorchDefaultPort(spec *corev1.PodSpec) { - index := getDefaultContainerIndex(spec, PytorchJobDefaultContainerName) - if ok := hasDefaultPort(spec, index, PytorchJobDefaultPortName); !ok { - setDefaultPort(spec, PytorchJobDefaultPortName, PytorchJobDefaultPort, index) +// setPyTorchDefaultPort sets the default ports for pytorch container. +func setPyTorchDefaultPort(spec *corev1.PodSpec) { + index := getDefaultContainerIndex(spec, PyTorchJobDefaultContainerName) + if ok := hasDefaultPort(spec, index, PyTorchJobDefaultPortName); !ok { + setDefaultPort(spec, PyTorchJobDefaultPortName, PyTorchJobDefaultPort, index) } } @@ -54,8 +54,8 @@ func setElasticPolicy(pytorchJob *PyTorchJob) { } } -// setPytorchTypeNamesToCamelCase sets the name of all replica types from any case to correct case. -func setPytorchTypeNamesToCamelCase(pytorchJob *PyTorchJob) { +// setPyTorchTypeNamesToCamelCase sets the name of all replica types from any case to correct case. +func setPyTorchTypeNamesToCamelCase(pytorchJob *PyTorchJob) { replicaTypes := []ReplicaType{ PyTorchJobReplicaTypeMaster, PyTorchJobReplicaTypeWorker, @@ -81,12 +81,12 @@ func SetDefaults_PyTorchJob(job *PyTorchJob) { } // Update the key of PyTorchReplicaSpecs to camel case. - setPytorchTypeNamesToCamelCase(job) + setPyTorchTypeNamesToCamelCase(job) for _, spec := range job.Spec.PyTorchReplicaSpecs { setDefaultReplicas(spec, 1) - setDefaultRestartPolicy(spec, PytorchJobDefaultRestartPolicy) - setPytorchDefaultPort(&spec.Template.Spec) + setDefaultRestartPolicy(spec, PyTorchJobDefaultRestartPolicy) + setPyTorchDefaultPort(&spec.Template.Spec) } // Set default elastic policy. setElasticPolicy(job) diff --git a/pkg/apis/kubeflow.org/v1/pytorch_types.go b/pkg/apis/kubeflow.org/v1/pytorch_types.go index 8e5b4030b0..59d58c3b90 100644 --- a/pkg/apis/kubeflow.org/v1/pytorch_types.go +++ b/pkg/apis/kubeflow.org/v1/pytorch_types.go @@ -20,23 +20,23 @@ import ( ) const ( - // PytorchJobDefaultPortName is name of the port used to communicate between Master and + // PyTorchJobDefaultPortName is name of the port used to communicate between Master and // workers. - PytorchJobDefaultPortName = "pytorchjob-port" - // PytorchJobDefaultContainerName is the name of the PyTorchJob container. - PytorchJobDefaultContainerName = "pytorch" - // PytorchJobDefaultPort is default value of the port. - PytorchJobDefaultPort = 23456 - // PytorchJobDefaultRestartPolicy is default RestartPolicy for PyTorchReplicaSpec. - PytorchJobDefaultRestartPolicy = RestartPolicyOnFailure - // PytorchJobKind is the kind name. - PytorchJobKind = "PyTorchJob" - // PytorchJobPlural is the PytorchPlural for pytorchJob. - PytorchJobPlural = "pytorchjobs" - // PytorchJobSingular is the singular for pytorchJob. - PytorchJobSingular = "pytorchjob" - // PytorchJobFrameworkName is the name of the ML Framework - PytorchJobFrameworkName = "pytorch" + PyTorchJobDefaultPortName = "pytorchjob-port" + // PyTorchJobDefaultContainerName is the name of the PyTorchJob container. + PyTorchJobDefaultContainerName = "pytorch" + // PyTorchJobDefaultPort is default value of the port. + PyTorchJobDefaultPort = 23456 + // PyTorchJobDefaultRestartPolicy is default RestartPolicy for PyTorchReplicaSpec. + PyTorchJobDefaultRestartPolicy = RestartPolicyOnFailure + // PyTorchJobKind is the kind name. + PyTorchJobKind = "PyTorchJob" + // PyTorchJobPlural is the PyTorchPlural for pytorchJob. + PyTorchJobPlural = "pytorchjobs" + // PyTorchJobSingular is the singular for pytorchJob. + PyTorchJobSingular = "pytorchjob" + // PyTorchJobFrameworkName is the name of the ML Framework + PyTorchJobFrameworkName = "pytorch" // PyTorchJobReplicaTypeMaster is the type of Master of distributed PyTorch PyTorchJobReplicaTypeMaster ReplicaType = "Master" // PyTorchJobReplicaTypeWorker is the type for workers of distributed PyTorch. @@ -168,5 +168,5 @@ type PyTorchJobList struct { func init() { SchemeBuilder.Register(&PyTorchJob{}, &PyTorchJobList{}) - SchemeBuilder.SchemeBuilder.Register(addPytorchDefaultingFuncs) + SchemeBuilder.SchemeBuilder.Register(addPyTorchDefaultingFuncs) } diff --git a/pkg/apis/kubeflow.org/v1/pytorch_validation.go b/pkg/apis/kubeflow.org/v1/pytorch_validation.go index 19932d6834..752b8196df 100644 --- a/pkg/apis/kubeflow.org/v1/pytorch_validation.go +++ b/pkg/apis/kubeflow.org/v1/pytorch_validation.go @@ -70,13 +70,13 @@ func validatePyTorchReplicaSpecs(specs map[ReplicaType]*ReplicaSpec) error { msg := fmt.Sprintf("PyTorchJobSpec is not valid: Image is undefined in the container of %v", rType) return fmt.Errorf(msg) } - if container.Name == PytorchJobDefaultContainerName { + if container.Name == PyTorchJobDefaultContainerName { defaultContainerPresent = true } } //Make sure there has at least one container named "pytorch" if !defaultContainerPresent { - msg := fmt.Sprintf("PyTorchJobSpec is not valid: There is no container named %s in %v", PytorchJobDefaultContainerName, rType) + msg := fmt.Sprintf("PyTorchJobSpec is not valid: There is no container named %s in %v", PyTorchJobDefaultContainerName, rType) return fmt.Errorf(msg) } if rType == PyTorchJobReplicaTypeMaster { diff --git a/pkg/controller.v1/pytorch/envvar.go b/pkg/controller.v1/pytorch/envvar.go index e8e4538aff..1529500761 100644 --- a/pkg/controller.v1/pytorch/envvar.go +++ b/pkg/controller.v1/pytorch/envvar.go @@ -152,10 +152,10 @@ func replicaName(jobName string, rtype kubeflowv1.ReplicaType, index int) string func getPortFromPyTorchJob(job *kubeflowv1.PyTorchJob, rtype kubeflowv1.ReplicaType) (int32, error) { containers := job.Spec.PyTorchReplicaSpecs[rtype].Template.Spec.Containers for _, container := range containers { - if container.Name == kubeflowv1.PytorchJobDefaultContainerName { + if container.Name == kubeflowv1.PyTorchJobDefaultContainerName { ports := container.Ports for _, port := range ports { - if port.Name == kubeflowv1.PytorchJobDefaultPortName { + if port.Name == kubeflowv1.PyTorchJobDefaultPortName { return port.ContainerPort, nil } } diff --git a/pkg/controller.v1/pytorch/hpa.go b/pkg/controller.v1/pytorch/hpa.go index 306f5a924b..e8846600cc 100644 --- a/pkg/controller.v1/pytorch/hpa.go +++ b/pkg/controller.v1/pytorch/hpa.go @@ -31,7 +31,7 @@ import ( ) func (r *PyTorchJobReconciler) ReconcileHPA(pytorchJob *kubeflowv1.PyTorchJob) error { - logger := r.Log.WithValues(kubeflowv1.PytorchJobSingular, pytorchJob.Name) + logger := r.Log.WithValues(kubeflowv1.PyTorchJobSingular, pytorchJob.Name) if pytorchJob.Spec.ElasticPolicy == nil || pytorchJob.Spec.ElasticPolicy.Metrics == nil { logger.V(1).Info( diff --git a/pkg/controller.v1/pytorch/initcontainer.go b/pkg/controller.v1/pytorch/initcontainer.go index 8c36e06a31..cab6b81657 100644 --- a/pkg/controller.v1/pytorch/initcontainer.go +++ b/pkg/controller.v1/pytorch/initcontainer.go @@ -108,7 +108,7 @@ func setInitContainer(obj interface{}, podTemplate *corev1.PodTemplateSpec, if !ok { return fmt.Errorf("%+v is not a type of PyTorchJob", obj) } - logger := log.WithValues(kubeflowv1.PytorchJobSingular, types.NamespacedName{ + logger := log.WithValues(kubeflowv1.PyTorchJobSingular, types.NamespacedName{ Namespace: pytorchJob.Namespace, Name: pytorchJob.Name, }) diff --git a/pkg/controller.v1/pytorch/pytorchjob_controller.go b/pkg/controller.v1/pytorch/pytorchjob_controller.go index 71ff68ba7b..6ed4694a93 100644 --- a/pkg/controller.v1/pytorch/pytorchjob_controller.go +++ b/pkg/controller.v1/pytorch/pytorchjob_controller.go @@ -123,7 +123,7 @@ type PyTorchJobReconciler struct { // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.8.3/pkg/reconcile func (r *PyTorchJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { _ = log.FromContext(ctx) - logger := r.Log.WithValues(kubeflowv1.PytorchJobSingular, req.NamespacedName) + logger := r.Log.WithValues(kubeflowv1.PyTorchJobSingular, req.NamespacedName) pytorchjob := &kubeflowv1.PyTorchJob{} err := r.Get(ctx, req.NamespacedName, pytorchjob) @@ -134,7 +134,7 @@ func (r *PyTorchJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) if err = kubeflowv1.ValidateV1PyTorchJob(pytorchjob); err != nil { logger.Error(err, "PyTorchJob failed validation") - r.Recorder.Eventf(pytorchjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobFailedValidationReason), + r.Recorder.Eventf(pytorchjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobFailedValidationReason), "PyTorchJob failed validation because %s", err) return ctrl.Result{}, err } @@ -242,7 +242,7 @@ func (r *PyTorchJobReconciler) ControllerName() string { } func (r *PyTorchJobReconciler) GetAPIGroupVersionKind() schema.GroupVersionKind { - return kubeflowv1.GroupVersion.WithKind(kubeflowv1.PytorchJobKind) + return kubeflowv1.GroupVersion.WithKind(kubeflowv1.PyTorchJobKind) } func (r *PyTorchJobReconciler) GetAPIGroupVersion() schema.GroupVersion { @@ -254,7 +254,7 @@ func (r *PyTorchJobReconciler) GetGroupNameLabelValue() string { } func (r *PyTorchJobReconciler) GetFrameworkName() string { - return kubeflowv1.PytorchJobFrameworkName + return kubeflowv1.PyTorchJobFrameworkName } func (r *PyTorchJobReconciler) GetJobFromInformerCache(namespace, name string) (metav1.Object, error) { @@ -392,18 +392,18 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, if rtype == kubeflowv1.PyTorchJobReplicaTypeMaster { if running > 0 { msg := fmt.Sprintf("PyTorchJob %s is running.", pytorchjob.Name) - commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRunningReason), msg) + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobRunningReason), msg) } // when master is succeed, the job is finished. if expected == 0 { msg := fmt.Sprintf("PyTorchJob %s is successfully completed.", pytorchjob.Name) logrus.Info(msg) - r.Recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg) + r.Recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobSucceededReason), msg) if jobStatus.CompletionTime == nil { now := metav1.Now() jobStatus.CompletionTime = &now } - commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg) + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobSucceededReason), msg) trainingoperatorcommon.SuccessfulJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName()) return nil } @@ -417,18 +417,18 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, if expected == 0 || (pytorchjob.Spec.ElasticPolicy != nil && succeeded > 0) { msg := fmt.Sprintf("PyTorchJob %s/%s successfully completed.", pytorchjob.Namespace, pytorchjob.Name) - r.recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg) + r.recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobSucceededReason), msg) if jobStatus.CompletionTime == nil { now := metav1.Now() jobStatus.CompletionTime = &now } - commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg) + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobSucceededReason), msg) trainingoperatorcommon.SuccessfulJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName()) } else if running > 0 { // Some workers are still running, leave a running condition. msg := fmt.Sprintf("PyTorchJob %s/%s is running.", pytorchjob.Namespace, pytorchjob.Name) - commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRunningReason), msg) + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobRunningReason), msg) } } } @@ -436,17 +436,17 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, if failed > 0 && (specReplicas > succeeded+running) { if spec.RestartPolicy != kubeflowv1.RestartPolicyNever { msg := fmt.Sprintf("PyTorchJob %s is restarting because %d %s replica(s) failed.", pytorchjob.Name, failed, rtype) - r.Recorder.Event(pytorchjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRestartingReason), msg) - commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRestartingReason), msg) + r.Recorder.Event(pytorchjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobRestartingReason), msg) + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobRestartingReason), msg) trainingoperatorcommon.RestartedJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName()) } else { msg := fmt.Sprintf("PyTorchJob %s is failed because %d %s replica(s) failed.", pytorchjob.Name, failed, rtype) - r.Recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobFailedReason), msg) + r.Recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobFailedReason), msg) if jobStatus.CompletionTime == nil { now := metav1.Now() jobStatus.CompletionTime = &now } - commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobFailedReason), msg) + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobFailedReason), msg) trainingoperatorcommon.FailedJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName()) } } @@ -505,11 +505,11 @@ func (r *PyTorchJobReconciler) SetClusterSpec(job interface{}, podTemplate *core } func (r *PyTorchJobReconciler) GetDefaultContainerName() string { - return kubeflowv1.PytorchJobDefaultContainerName + return kubeflowv1.PyTorchJobDefaultContainerName } func (r *PyTorchJobReconciler) GetDefaultContainerPortName() string { - return kubeflowv1.PytorchJobDefaultPortName + return kubeflowv1.PyTorchJobDefaultPortName } func (r *PyTorchJobReconciler) IsMasterRole(replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec, @@ -528,7 +528,7 @@ func (r *PyTorchJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool msg := fmt.Sprintf("PyTorchJob %s is created.", e.Object.GetName()) logrus.Info(msg) trainingoperatorcommon.CreatedJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName()) - commonutil.UpdateJobConditions(&pytorchjob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobCreatedReason), msg) + commonutil.UpdateJobConditions(&pytorchjob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobCreatedReason), msg) return true } } diff --git a/pkg/controller.v1/pytorch/pytorchjob_controller_test.go b/pkg/controller.v1/pytorch/pytorchjob_controller_test.go index 07c147c4eb..2065f9c410 100644 --- a/pkg/controller.v1/pytorch/pytorchjob_controller_test.go +++ b/pkg/controller.v1/pytorch/pytorchjob_controller_test.go @@ -77,10 +77,10 @@ var _ = Describe("PyTorchJob controller", func() { Containers: []corev1.Container{ { Image: "test-image", - Name: kubeflowv1.PytorchJobDefaultContainerName, + Name: kubeflowv1.PyTorchJobDefaultContainerName, Ports: []corev1.ContainerPort{ { - Name: kubeflowv1.PytorchJobDefaultPortName, + Name: kubeflowv1.PyTorchJobDefaultPortName, ContainerPort: expectedPort, Protocol: corev1.ProtocolTCP, }, @@ -97,10 +97,10 @@ var _ = Describe("PyTorchJob controller", func() { Containers: []corev1.Container{ { Image: "test-image", - Name: kubeflowv1.PytorchJobDefaultContainerName, + Name: kubeflowv1.PyTorchJobDefaultContainerName, Ports: []corev1.ContainerPort{ { - Name: kubeflowv1.PytorchJobDefaultPortName, + Name: kubeflowv1.PyTorchJobDefaultPortName, ContainerPort: expectedPort, Protocol: corev1.ProtocolTCP, }, @@ -142,7 +142,7 @@ var _ = Describe("PyTorchJob controller", func() { // Check the pod port. Expect(masterPod.Spec.Containers[0].Ports).To(ContainElement(corev1.ContainerPort{ - Name: kubeflowv1.PytorchJobDefaultPortName, + Name: kubeflowv1.PyTorchJobDefaultPortName, ContainerPort: expectedPort, Protocol: corev1.ProtocolTCP})) // Check env variable @@ -162,7 +162,7 @@ var _ = Describe("PyTorchJob controller", func() { trueVal := true Expect(masterPod.OwnerReferences).To(ContainElement(metav1.OwnerReference{ APIVersion: kubeflowv1.SchemeGroupVersion.String(), - Kind: kubeflowv1.PytorchJobKind, + Kind: kubeflowv1.PyTorchJobKind, Name: name, UID: created.UID, Controller: &trueVal, @@ -170,7 +170,7 @@ var _ = Describe("PyTorchJob controller", func() { })) Expect(masterSvc.OwnerReferences).To(ContainElement(metav1.OwnerReference{ APIVersion: kubeflowv1.SchemeGroupVersion.String(), - Kind: kubeflowv1.PytorchJobKind, + Kind: kubeflowv1.PyTorchJobKind, Name: name, UID: created.UID, Controller: &trueVal, @@ -237,13 +237,13 @@ var _ = Describe("PyTorchJob controller", func() { { Type: kubeflowv1.JobCreated, Status: corev1.ConditionTrue, - Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobCreatedReason), + Reason: commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobCreatedReason), Message: fmt.Sprintf("PyTorchJob %s is created.", name), }, { Type: kubeflowv1.JobSuspended, Status: corev1.ConditionTrue, - Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSuspendedReason), + Reason: commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobSuspendedReason), Message: fmt.Sprintf("PyTorchJob %s is suspended.", name), }, }, testutil.IgnoreJobConditionsTimes)) @@ -306,18 +306,18 @@ var _ = Describe("PyTorchJob controller", func() { { Type: kubeflowv1.JobCreated, Status: corev1.ConditionTrue, - Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobCreatedReason), + Reason: commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobCreatedReason), Message: fmt.Sprintf("PyTorchJob %s is created.", name), }, { Type: kubeflowv1.JobRunning, Status: corev1.ConditionTrue, - Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRunningReason), + Reason: commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobRunningReason), Message: fmt.Sprintf("PyTorchJob %s is running.", name), }, }, testutil.IgnoreJobConditionsTimes)) - By("Updating the PytorchJob with suspend=true") + By("Updating the PyTorchJob with suspend=true") Eventually(func() error { Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) created.Spec.RunPolicy.Suspend = pointer.Bool(true) @@ -361,18 +361,18 @@ var _ = Describe("PyTorchJob controller", func() { { Type: kubeflowv1.JobCreated, Status: corev1.ConditionTrue, - Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobCreatedReason), + Reason: commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobCreatedReason), Message: fmt.Sprintf("PyTorchJob %s is created.", name), }, { Type: kubeflowv1.JobRunning, Status: corev1.ConditionFalse, - Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSuspendedReason), + Reason: commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobSuspendedReason), Message: fmt.Sprintf("PyTorchJob %s is suspended.", name), }, { Type: kubeflowv1.JobSuspended, - Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSuspendedReason), + Reason: commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobSuspendedReason), Message: fmt.Sprintf("PyTorchJob %s is suspended.", name), Status: corev1.ConditionTrue, }, @@ -423,19 +423,19 @@ var _ = Describe("PyTorchJob controller", func() { { Type: kubeflowv1.JobCreated, Status: corev1.ConditionTrue, - Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobCreatedReason), + Reason: commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobCreatedReason), Message: fmt.Sprintf("PyTorchJob %s is created.", name), }, { Type: kubeflowv1.JobSuspended, - Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobResumedReason), + Reason: commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobResumedReason), Message: fmt.Sprintf("PyTorchJob %s is resumed.", name), Status: corev1.ConditionFalse, }, { Type: kubeflowv1.JobRunning, Status: corev1.ConditionTrue, - Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRunningReason), + Reason: commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobRunningReason), Message: fmt.Sprintf("PyTorchJob %s is running.", name), }, }, testutil.IgnoreJobConditionsTimes)) @@ -499,10 +499,10 @@ var _ = Describe("PyTorchJob controller", func() { Containers: []corev1.Container{ { Image: "test-image", - Name: kubeflowv1.PytorchJobDefaultContainerName, + Name: kubeflowv1.PyTorchJobDefaultContainerName, Ports: []corev1.ContainerPort{ { - Name: kubeflowv1.PytorchJobDefaultPortName, + Name: kubeflowv1.PyTorchJobDefaultPortName, ContainerPort: expectedPort, Protocol: corev1.ProtocolTCP, }, @@ -550,7 +550,7 @@ var _ = Describe("PyTorchJob controller", func() { // Check pod port. Expect(pod.Spec.Containers[0].Ports).To(ContainElement(corev1.ContainerPort{ - Name: kubeflowv1.PytorchJobDefaultPortName, + Name: kubeflowv1.PyTorchJobDefaultPortName, ContainerPort: expectedPort, Protocol: corev1.ProtocolTCP})) // Check environment variables. @@ -572,7 +572,7 @@ var _ = Describe("PyTorchJob controller", func() { trueVal := true Expect(pod.OwnerReferences).To(ContainElement(metav1.OwnerReference{ APIVersion: kubeflowv1.SchemeGroupVersion.String(), - Kind: kubeflowv1.PytorchJobKind, + Kind: kubeflowv1.PyTorchJobKind, Name: name, UID: created.UID, Controller: &trueVal, @@ -580,7 +580,7 @@ var _ = Describe("PyTorchJob controller", func() { })) Expect(svc.OwnerReferences).To(ContainElement(metav1.OwnerReference{ APIVersion: kubeflowv1.SchemeGroupVersion.String(), - Kind: kubeflowv1.PytorchJobKind, + Kind: kubeflowv1.PyTorchJobKind, Name: name, UID: created.UID, Controller: &trueVal, diff --git a/pkg/controller.v1/register_controller.go b/pkg/controller.v1/register_controller.go index d7375a6e24..e38f68b087 100644 --- a/pkg/controller.v1/register_controller.go +++ b/pkg/controller.v1/register_controller.go @@ -38,7 +38,7 @@ var SupportedSchemeReconciler = map[string]ReconcilerSetupFunc{ kubeflowv1.TFJobKind: func(mgr manager.Manager, gangSchedulingSetupFunc common.GangSchedulingSetupFunc, controllerThreads int) error { return tensorflowcontroller.NewReconciler(mgr, gangSchedulingSetupFunc).SetupWithManager(mgr, controllerThreads) }, - kubeflowv1.PytorchJobKind: func(mgr manager.Manager, gangSchedulingSetupFunc common.GangSchedulingSetupFunc, controllerThreads int) error { + kubeflowv1.PyTorchJobKind: func(mgr manager.Manager, gangSchedulingSetupFunc common.GangSchedulingSetupFunc, controllerThreads int) error { return pytorchcontroller.NewReconciler(mgr, gangSchedulingSetupFunc).SetupWithManager(mgr, controllerThreads) }, kubeflowv1.MXJobKind: func(mgr manager.Manager, gangSchedulingSetupFunc common.GangSchedulingSetupFunc, controllerThreads int) error {