From 383e6b78d94e31e0541dd9b7d71421ac7f159d3e Mon Sep 17 00:00:00 2001 From: Yuki Iwai Date: Sun, 9 Jul 2023 23:02:42 +0900 Subject: [PATCH] Implement suspend semantics to PyTorchJob Signed-off-by: Yuki Iwai --- manifests/base/crds/kubeflow.org_mpijobs.yaml | 10 + manifests/base/crds/kubeflow.org_mxjobs.yaml | 10 + .../base/crds/kubeflow.org_paddlejobs.yaml | 10 + .../base/crds/kubeflow.org_pytorchjobs.yaml | 10 + manifests/base/crds/kubeflow.org_tfjobs.yaml | 10 + .../base/crds/kubeflow.org_xgboostjobs.yaml | 10 + pkg/controller.v1/common/job.go | 93 +++-- pkg/controller.v1/common/pod.go | 2 +- pkg/controller.v1/mpi/mpijob_controller.go | 10 +- pkg/controller.v1/mxnet/mxjob_controller.go | 15 +- .../paddlepaddle/paddlepaddle_controller.go | 22 +- pkg/controller.v1/pytorch/hpa.go | 32 +- .../pytorch/pytorchjob_controller.go | 27 +- .../pytorch/pytorchjob_controller_test.go | 390 ++++++++++++++++-- pkg/controller.v1/tensorflow/job_test.go | 3 +- .../tensorflow/tfjob_controller.go | 19 +- pkg/controller.v1/xgboost/status.go | 3 +- .../xgboost/xgboostjob_controller.go | 9 +- pkg/reconciler.v1/common/job.go | 17 +- pkg/util/status.go | 25 +- pkg/util/status_test.go | 34 +- pkg/util/testutil/constants.go | 17 +- pkg/util/train/train_util.go | 10 + pkg/util/train/train_util_test.go | 46 ++- 24 files changed, 695 insertions(+), 139 deletions(-) diff --git a/manifests/base/crds/kubeflow.org_mpijobs.yaml b/manifests/base/crds/kubeflow.org_mpijobs.yaml index a923e49f6c..06fb5a970b 100644 --- a/manifests/base/crds/kubeflow.org_mpijobs.yaml +++ b/manifests/base/crds/kubeflow.org_mpijobs.yaml @@ -7702,6 +7702,16 @@ spec: format: int32 type: integer type: object + suspend: + default: false + description: suspend specifies whether the Job controller should + create Pods or not. If a Job is created with suspend set to + true, no Pods are created by the Job controller. If a Job is + suspended after creation (i.e. the flag goes from false to true), + the Job controller will delete all active Pods and PodGroups + associated with this Job. Users must design their workload to + gracefully handle this. + type: boolean ttlSecondsAfterFinished: description: TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since diff --git a/manifests/base/crds/kubeflow.org_mxjobs.yaml b/manifests/base/crds/kubeflow.org_mxjobs.yaml index 682e1f04bb..99be8c1f19 100644 --- a/manifests/base/crds/kubeflow.org_mxjobs.yaml +++ b/manifests/base/crds/kubeflow.org_mxjobs.yaml @@ -7701,6 +7701,16 @@ spec: format: int32 type: integer type: object + suspend: + default: false + description: suspend specifies whether the Job controller should + create Pods or not. If a Job is created with suspend set to + true, no Pods are created by the Job controller. If a Job is + suspended after creation (i.e. the flag goes from false to true), + the Job controller will delete all active Pods and PodGroups + associated with this Job. Users must design their workload to + gracefully handle this. + type: boolean ttlSecondsAfterFinished: description: TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since diff --git a/manifests/base/crds/kubeflow.org_paddlejobs.yaml b/manifests/base/crds/kubeflow.org_paddlejobs.yaml index 96df952bb6..32cefe8517 100644 --- a/manifests/base/crds/kubeflow.org_paddlejobs.yaml +++ b/manifests/base/crds/kubeflow.org_paddlejobs.yaml @@ -8212,6 +8212,16 @@ spec: format: int32 type: integer type: object + suspend: + default: false + description: suspend specifies whether the Job controller should + create Pods or not. If a Job is created with suspend set to + true, no Pods are created by the Job controller. If a Job is + suspended after creation (i.e. the flag goes from false to true), + the Job controller will delete all active Pods and PodGroups + associated with this Job. Users must design their workload to + gracefully handle this. + type: boolean ttlSecondsAfterFinished: description: TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since diff --git a/manifests/base/crds/kubeflow.org_pytorchjobs.yaml b/manifests/base/crds/kubeflow.org_pytorchjobs.yaml index afe8ab040a..8744e51480 100644 --- a/manifests/base/crds/kubeflow.org_pytorchjobs.yaml +++ b/manifests/base/crds/kubeflow.org_pytorchjobs.yaml @@ -8241,6 +8241,16 @@ spec: format: int32 type: integer type: object + suspend: + default: false + description: suspend specifies whether the Job controller should + create Pods or not. If a Job is created with suspend set to + true, no Pods are created by the Job controller. If a Job is + suspended after creation (i.e. the flag goes from false to true), + the Job controller will delete all active Pods and PodGroups + associated with this Job. Users must design their workload to + gracefully handle this. + type: boolean ttlSecondsAfterFinished: description: TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since diff --git a/manifests/base/crds/kubeflow.org_tfjobs.yaml b/manifests/base/crds/kubeflow.org_tfjobs.yaml index 9e85cccc0a..b62e42b19c 100644 --- a/manifests/base/crds/kubeflow.org_tfjobs.yaml +++ b/manifests/base/crds/kubeflow.org_tfjobs.yaml @@ -87,6 +87,16 @@ spec: format: int32 type: integer type: object + suspend: + default: false + description: suspend specifies whether the Job controller should + create Pods or not. If a Job is created with suspend set to + true, no Pods are created by the Job controller. If a Job is + suspended after creation (i.e. the flag goes from false to true), + the Job controller will delete all active Pods and PodGroups + associated with this Job. Users must design their workload to + gracefully handle this. + type: boolean ttlSecondsAfterFinished: description: TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since diff --git a/manifests/base/crds/kubeflow.org_xgboostjobs.yaml b/manifests/base/crds/kubeflow.org_xgboostjobs.yaml index 1b5a8f0ed7..92f6c960c7 100644 --- a/manifests/base/crds/kubeflow.org_xgboostjobs.yaml +++ b/manifests/base/crds/kubeflow.org_xgboostjobs.yaml @@ -83,6 +83,16 @@ spec: format: int32 type: integer type: object + suspend: + default: false + description: suspend specifies whether the Job controller should + create Pods or not. If a Job is created with suspend set to + true, no Pods are created by the Job controller. If a Job is + suspended after creation (i.e. the flag goes from false to true), + the Job controller will delete all active Pods and PodGroups + associated with this Job. Users must design their workload to + gracefully handle this. + type: boolean ttlSecondsAfterFinished: description: TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since diff --git a/pkg/controller.v1/common/job.go b/pkg/controller.v1/common/job.go index a218cfb3be..35a3e4e2f2 100644 --- a/pkg/controller.v1/common/job.go +++ b/pkg/controller.v1/common/job.go @@ -26,6 +26,7 @@ import ( "github.com/kubeflow/training-operator/pkg/core" commonutil "github.com/kubeflow/training-operator/pkg/util" "github.com/kubeflow/training-operator/pkg/util/k8sutil" + trainutil "github.com/kubeflow/training-operator/pkg/util/train" log "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" @@ -38,13 +39,13 @@ import ( volcanov1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" ) -func (jc *JobController) DeletePodsAndServices(runPolicy *apiv1.RunPolicy, job interface{}, pods []*corev1.Pod) error { +func (jc *JobController) DeletePodsAndServices(runPolicy *apiv1.RunPolicy, runtimeObject runtime.Object, pods []*corev1.Pod) error { if len(pods) == 0 { return nil } - // Delete nothing when the cleanPodPolicy is None. - if *runPolicy.CleanPodPolicy == apiv1.CleanPodPolicyNone { + // Delete nothing when the cleanPodPolicy is None and the runPolicy.suspend is false. + if !trainutil.IsJobSuspended(runPolicy) && *runPolicy.CleanPodPolicy == apiv1.CleanPodPolicyNone { return nil } @@ -55,11 +56,11 @@ func (jc *JobController) DeletePodsAndServices(runPolicy *apiv1.RunPolicy, job i if *runPolicy.CleanPodPolicy == apiv1.CleanPodPolicyRunning && pod.Status.Phase != corev1.PodRunning && pod.Status.Phase != corev1.PodPending { continue } - if err := jc.PodControl.DeletePod(pod.Namespace, pod.Name, job.(runtime.Object)); err != nil { + if err := jc.PodControl.DeletePod(pod.Namespace, pod.Name, runtimeObject); err != nil { return err } // Pod and service have the same name, thus the service could be deleted using pod's name. - if err := jc.ServiceControl.DeleteService(pod.Namespace, pod.Name, job.(runtime.Object)); err != nil { + if err := jc.ServiceControl.DeleteService(pod.Namespace, pod.Name, runtimeObject); err != nil { return err } } @@ -118,22 +119,8 @@ func (jc *JobController) ReconcileJobs( oldStatus := jobStatus.DeepCopy() if commonutil.IsSucceeded(jobStatus) || commonutil.IsFailed(jobStatus) { - // If the Job is succeed or failed, delete all pods and services. - if err := jc.DeletePodsAndServices(runPolicy, job, pods); err != nil { - return err - } - - if jc.Config.EnableGangScheduling() { - jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, "JobTerminated", "Job has been terminated. Deleting PodGroup") - if err := jc.DeletePodGroup(metaObject); err != nil { - jc.Recorder.Eventf(runtimeObject, corev1.EventTypeWarning, "FailedDeletePodGroup", "Error deleting: %v", err) - return err - } else { - jc.Recorder.Eventf(runtimeObject, corev1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", jobName) - } - } - - if err := jc.CleanupJob(runPolicy, jobStatus, job); err != nil { + // If the Job is succeed or failed, delete all pods, services, and podGroup. + if err = jc.CleanUpResources(runPolicy, runtimeObject, metaObject, jobStatus, pods); err != nil { return err } @@ -155,6 +142,40 @@ func (jc *JobController) ReconcileJobs( return nil } + if trainutil.IsJobSuspended(runPolicy) { + if err = jc.CleanUpResources(runPolicy, runtimeObject, metaObject, jobStatus, pods); err != nil { + return err + } + for rType := range jobStatus.ReplicaStatuses { + jobStatus.ReplicaStatuses[rType].Active = 0 + } + jobStatus.StartTime = nil + msg := fmt.Sprintf("%s %s is suspended.", jc.Controller.GetAPIGroupVersionKind().Kind, jobName) + if commonutil.IsRunning(jobStatus) { + if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobRunning, corev1.ConditionFalse, + commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg); err != nil { + return err + } + } + if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobSuspended, corev1.ConditionTrue, + commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg); err != nil { + return err + } + jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg) + if !reflect.DeepEqual(*oldStatus, jobStatus) { + return jc.Controller.UpdateJobStatusInApiServer(job, &jobStatus) + } + return nil + } + if !trainutil.IsJobSuspended(runPolicy) && commonutil.IsSuspend(jobStatus) { + msg := fmt.Sprintf("%s %s is resumed.", jc.Controller.GetAPIGroupVersionKind().Kind, jobName) + if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobSuspended, corev1.ConditionFalse, + commonutil.NewReason(jobKind, commonutil.JobResumedReason), msg); err != nil { + return err + } + jc.Recorder.Eventf(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobResumedReason), msg) + } + // retrieve the previous number of retry previousRetry := jc.WorkQueue.NumRequeues(jobKey) @@ -205,7 +226,7 @@ func (jc *JobController) ReconcileJobs( // If the Job exceeds backoff limit or is past active deadline // delete all pods and services, then set the status to failed - if err := jc.DeletePodsAndServices(runPolicy, job, pods); err != nil { + if err := jc.DeletePodsAndServices(runPolicy, runtimeObject, pods); err != nil { return err } @@ -225,7 +246,7 @@ func (jc *JobController) ReconcileJobs( jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage) - if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage); err != nil { + if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage); err != nil { log.Infof("Append job condition error: %v", err) return err } @@ -344,6 +365,32 @@ func (jc *JobController) ReconcileJobs( return nil } +func (jc *JobController) CleanUpResources( + runPolicy *apiv1.RunPolicy, + runtimeObject runtime.Object, + metaObject metav1.Object, + jobStatus apiv1.JobStatus, + pods []*v1.Pod, +) error { + if err := jc.DeletePodsAndServices(runPolicy, runtimeObject, pods); err != nil { + return err + } + if jc.Config.EnableGangScheduling() { + + jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, "JobTerminated", "Job has been terminated. Deleting PodGroup") + if err := jc.DeletePodGroup(metaObject); err != nil { + jc.Recorder.Eventf(runtimeObject, corev1.EventTypeWarning, "FailedDeletePodGroup", "Error deleting: %v", err) + return err + } else { + jc.Recorder.Eventf(runtimeObject, corev1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", metaObject.GetName()) + } + } + if err := jc.CleanupJob(runPolicy, jobStatus, runtimeObject); err != nil { + return err + } + return nil +} + // ResetExpectations reset the expectation for creates and deletes of pod/service to zero. func (jc *JobController) ResetExpectations(jobKey string, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) error { var allErrs error diff --git a/pkg/controller.v1/common/pod.go b/pkg/controller.v1/common/pod.go index 7ade04d81b..be9d3e0c17 100644 --- a/pkg/controller.v1/common/pod.go +++ b/pkg/controller.v1/common/pod.go @@ -361,7 +361,7 @@ func (jc *JobController) ReconcilePods( metaObject.GetName(), rType) jc.Recorder.Event(runtimeObject, v1.EventTypeWarning, commonutil.NewReason(jc.Controller.GetAPIGroupVersionKind().Kind, commonutil.JobRestartingReason), msg) - if err := commonutil.UpdateJobConditions(jobStatus, apiv1.JobRestarting, + if err := commonutil.UpdateJobConditions(jobStatus, apiv1.JobRestarting, v1.ConditionTrue, commonutil.NewReason(jc.Controller.GetAPIGroupVersionKind().Kind, commonutil.JobRestartingReason), msg); err != nil { commonutil.LoggerForJob(metaObject).Infof("Append job condition error: %v", err) return err diff --git a/pkg/controller.v1/mpi/mpijob_controller.go b/pkg/controller.v1/mpi/mpijob_controller.go index 9a520d55e2..0b32e4268f 100644 --- a/pkg/controller.v1/mpi/mpijob_controller.go +++ b/pkg/controller.v1/mpi/mpijob_controller.go @@ -320,7 +320,7 @@ func (jc *MPIJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, e.Object.GetName()) logrus.Info(msg) trainingoperatorcommon.CreatedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName()) - if err := commonutil.UpdateJobConditions(&mpiJob.Status, kubeflowv1.JobCreated, + if err := commonutil.UpdateJobConditions(&mpiJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobCreatedReason), msg); err != nil { log.Log.Error(err, "append job condition error") return false @@ -583,7 +583,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl if rtype == kubeflowv1.MPIJobReplicaTypeLauncher { if running > 0 { msg := fmt.Sprintf("MPIJob %s is running.", mpiJob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRunningReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRunningReason), msg) if err != nil { commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err) return err @@ -598,7 +598,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSucceededReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSucceededReason), msg) if err != nil { commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err) return err @@ -611,7 +611,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode { msg := fmt.Sprintf("MPIJob %s is restarting because %d %s replica(s) failed.", mpiJob.Name, failed, rtype) jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRestartingReason), msg) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRestartingReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRestartingReason), msg) if err != nil { commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err) return err @@ -624,7 +624,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedReason)), msg) if err != nil { commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err) diff --git a/pkg/controller.v1/mxnet/mxjob_controller.go b/pkg/controller.v1/mxnet/mxjob_controller.go index 680ef313b0..24d71f169f 100644 --- a/pkg/controller.v1/mxnet/mxjob_controller.go +++ b/pkg/controller.v1/mxnet/mxjob_controller.go @@ -371,7 +371,8 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow if rtype == kubeflowv1.MXJobReplicaTypeScheduler || singleTraining { if running > 0 { msg := fmt.Sprintf("MXJob %s is running.", mxjob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRunningReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRunningReason), msg) if err != nil { logrus.Infof("Append mxjob condition error: %v", err) return err @@ -385,7 +386,8 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobSucceededReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobSucceededReason), msg) if err != nil { logrus.Infof("Append mxjob condition error: %v", err) return err @@ -398,7 +400,8 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode { msg := fmt.Sprintf("mxjob %s is restarting because %d %s replica(s) failed.", mxjob.Name, failed, rtype) r.Recorder.Event(mxjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRestartingReason), msg) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRestartingReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRestartingReason), msg) if err != nil { logrus.Infof("Append job condition error: %v", err) return err @@ -411,7 +414,8 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobFailedReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobFailedReason), msg) if err != nil { logrus.Infof("Append job condition error: %v", err) return err @@ -478,7 +482,8 @@ func (r *MXJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { msg := fmt.Sprintf("MXJob %s is created.", e.Object.GetName()) logrus.Info(msg) trainingoperatorcommon.CreatedJobsCounterInc(mxJob.Namespace, r.GetFrameworkName()) - if err := commonutil.UpdateJobConditions(&mxJob.Status, kubeflowv1.JobCreated, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobCreatedReason), msg); err != nil { + if err := commonutil.UpdateJobConditions(&mxJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobCreatedReason), msg); err != nil { logrus.Error(err, "append job condition error") return false } diff --git a/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go b/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go index e94c31b38d..ed1b33b47c 100644 --- a/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go +++ b/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go @@ -393,7 +393,8 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, if rtype == kubeflowv1.PaddleJobReplicaTypeMaster { if running > 0 { msg := fmt.Sprintf("PaddleJob %s is running.", paddlejob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRunningReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRunningReason), msg) if err != nil { commonutil.LoggerForJob(paddlejob).Infof("Append job condition error: %v", err) return err @@ -408,7 +409,8 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobSucceededReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobSucceededReason), msg) if err != nil { commonutil.LoggerForJob(paddlejob).Infof("Append job condition error: %v", err) return err @@ -428,8 +430,8 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, - kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobSucceededReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobSucceededReason), msg) if err != nil { commonutil.LoggerForJob(paddlejob).Infof("Append paddlejob condition error: %v", err) return err @@ -439,7 +441,8 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, // Some workers are still running, leave a running condition. msg := fmt.Sprintf("PaddleJob %s/%s is running.", paddlejob.Namespace, paddlejob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRunningReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRunningReason), msg) if err != nil { commonutil.LoggerForJob(paddlejob).Infof("Append paddlejob condition error: %v", err) return err @@ -452,7 +455,8 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, if spec.RestartPolicy != kubeflowv1.RestartPolicyNever { msg := fmt.Sprintf("PaddleJob %s is restarting because %d %s replica(s) failed.", paddlejob.Name, failed, rtype) r.Recorder.Event(paddlejob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRestartingReason), msg) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRestartingReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRestartingReason), msg) if err != nil { commonutil.LoggerForJob(paddlejob).Infof("Append job condition error: %v", err) return err @@ -465,7 +469,8 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobFailedReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobFailedReason), msg) if err != nil { commonutil.LoggerForJob(paddlejob).Infof("Append job condition error: %v", err) return err @@ -550,7 +555,8 @@ func (r *PaddleJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { msg := fmt.Sprintf("PaddleJob %s is created.", e.Object.GetName()) logrus.Info(msg) trainingoperatorcommon.CreatedJobsCounterInc(paddlejob.Namespace, r.GetFrameworkName()) - if err := commonutil.UpdateJobConditions(&paddlejob.Status, kubeflowv1.JobCreated, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobCreatedReason), msg); err != nil { + if err := commonutil.UpdateJobConditions(&paddlejob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobCreatedReason), msg); err != nil { logrus.Error(err, "append job condition error") return false } diff --git a/pkg/controller.v1/pytorch/hpa.go b/pkg/controller.v1/pytorch/hpa.go index 425eb0051f..d02fe4b9f3 100644 --- a/pkg/controller.v1/pytorch/hpa.go +++ b/pkg/controller.v1/pytorch/hpa.go @@ -17,13 +17,15 @@ package pytorch import ( "context" + trainutil "github.com/kubeflow/training-operator/pkg/util/train" autoscalingv2 "k8s.io/api/autoscaling/v2" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" ) @@ -45,21 +47,25 @@ func (r *PyTorchJobReconciler) ReconcileHPA(pytorchJob *kubeflowv1.PyTorchJob) e return err } - if err := r.Get(context.TODO(), types.NamespacedName{ - Name: pytorchJob.Name, - Namespace: pytorchJob.Namespace, - }, current); err != nil { - if !errors.IsNotFound(err) { - return err + err = r.Get(context.TODO(), client.ObjectKeyFromObject(expected), current) + if err != nil { + if errors.IsNotFound(err) && !trainutil.IsJobSuspended(&pytorchJob.Spec.RunPolicy) { + // Create the new HPA. + logger.V(1).Info("Creating HPA", "namespace", expected.Namespace, "name", expected.Name) + err = r.Create(context.TODO(), expected) + if err != nil { + return err + } + return nil } - - // Create the new HPA. - logger.V(1).Info("Creating HPA", "namespace", expected.Namespace, "name", expected.Name) - err = r.Create(context.TODO(), expected) - if err != nil { + return err + } + if trainutil.IsJobSuspended(&pytorchJob.Spec.RunPolicy) { + // Delete the current HPA + logger.V(1).Info("Deleting HPA", "HorizontalPodAutoscaler", klog.KObj(current)) + if err = r.Delete(context.TODO(), current); err != nil { return err } - return nil } if !equality.Semantic.DeepEqual(expected.Spec, current.Spec) { diff --git a/pkg/controller.v1/pytorch/pytorchjob_controller.go b/pkg/controller.v1/pytorch/pytorchjob_controller.go index 6344757cc2..d7bbe8f830 100644 --- a/pkg/controller.v1/pytorch/pytorchjob_controller.go +++ b/pkg/controller.v1/pytorch/pytorchjob_controller.go @@ -27,6 +27,7 @@ import ( "github.com/kubeflow/training-operator/pkg/controller.v1/control" "github.com/kubeflow/training-operator/pkg/controller.v1/expectation" commonutil "github.com/kubeflow/training-operator/pkg/util" + trainutil "github.com/kubeflow/training-operator/pkg/util/train" "github.com/go-logr/logr" "github.com/sirupsen/logrus" @@ -363,8 +364,8 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, logger := commonutil.LoggerForJob(pytorchjob) - // Set StartTime. - if jobStatus.StartTime == nil { + if !trainutil.IsJobSuspended(&pytorchjob.Spec.RunPolicy) && jobStatus.StartTime == nil { + // Set StartTime. now := metav1.Now() jobStatus.StartTime = &now // enqueue a sync to check if job past ActiveDeadlineSeconds @@ -392,7 +393,8 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, if rtype == kubeflowv1.PyTorchJobReplicaTypeMaster { if running > 0 { msg := fmt.Sprintf("PyTorchJob %s is running.", pytorchjob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRunningReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRunningReason), msg) if err != nil { commonutil.LoggerForJob(pytorchjob).Infof("Append job condition error: %v", err) return err @@ -407,7 +409,8 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg) if err != nil { commonutil.LoggerForJob(pytorchjob).Infof("Append job condition error: %v", err) return err @@ -430,8 +433,8 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, - kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg) if err != nil { commonutil.LoggerForJob(pytorchjob).Infof("Append pytorchjob condition error: %v", err) return err @@ -441,7 +444,8 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, // Some workers are still running, leave a running condition. msg := fmt.Sprintf("PyTorchJob %s/%s is running.", pytorchjob.Namespace, pytorchjob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRunningReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRunningReason), msg) if err != nil { commonutil.LoggerForJob(pytorchjob).Infof("Append pytorchjob condition error: %v", err) return err @@ -454,7 +458,8 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, if spec.RestartPolicy != kubeflowv1.RestartPolicyNever { msg := fmt.Sprintf("PyTorchJob %s is restarting because %d %s replica(s) failed.", pytorchjob.Name, failed, rtype) r.Recorder.Event(pytorchjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRestartingReason), msg) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRestartingReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRestartingReason), msg) if err != nil { commonutil.LoggerForJob(pytorchjob).Infof("Append job condition error: %v", err) return err @@ -467,7 +472,8 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobFailedReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobFailedReason), msg) if err != nil { commonutil.LoggerForJob(pytorchjob).Infof("Append job condition error: %v", err) return err @@ -553,7 +559,8 @@ func (r *PyTorchJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool msg := fmt.Sprintf("PyTorchJob %s is created.", e.Object.GetName()) logrus.Info(msg) trainingoperatorcommon.CreatedJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName()) - if err := commonutil.UpdateJobConditions(&pytorchjob.Status, kubeflowv1.JobCreated, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobCreatedReason), msg); err != nil { + if err := commonutil.UpdateJobConditions(&pytorchjob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobCreatedReason), msg); err != nil { logrus.Error(err, "append job condition error") return false } diff --git a/pkg/controller.v1/pytorch/pytorchjob_controller_test.go b/pkg/controller.v1/pytorch/pytorchjob_controller_test.go index 81b2a29556..733af91d0c 100644 --- a/pkg/controller.v1/pytorch/pytorchjob_controller_test.go +++ b/pkg/controller.v1/pytorch/pytorchjob_controller_test.go @@ -20,12 +20,17 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + autoscalingv2 "k8s.io/api/autoscaling/v2" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client" kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + commonutil "github.com/kubeflow/training-operator/pkg/util" "github.com/kubeflow/training-operator/pkg/util/testutil" ) @@ -36,14 +41,33 @@ var _ = Describe("PyTorchJob controller", func() { ) Context("When creating the PyTorchJob", func() { - It("Should get the corresponding resources successfully", func() { - const ( - namespace = "default" - name = "test-job" - ) - By("By creating a new PyTorchJob") - ctx := context.Background() - job := newPyTorchJobForTest(name, namespace) + const name = "test-job" + var ( + ns *corev1.Namespace + job *kubeflowv1.PyTorchJob + jobKey types.NamespacedName + masterKey types.NamespacedName + worker0Key types.NamespacedName + ctx = context.Background() + ) + BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "pytorch-test-", + }, + } + Expect(testK8sClient.Create(ctx, ns)).Should(Succeed()) + + job = newPyTorchJobForTest(name, ns.Name) + jobKey = client.ObjectKeyFromObject(job) + masterKey = types.NamespacedName{ + Name: fmt.Sprintf("%s-master-0", name), + Namespace: ns.Name, + } + worker0Key = types.NamespacedName{ + Name: fmt.Sprintf("%s-worker-0", name), + Namespace: ns.Name, + } job.Spec.PyTorchReplicaSpecs = map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec{ kubeflowv1.PyTorchJobReplicaTypeMaster: { Replicas: pointer.Int32(1), @@ -86,19 +110,23 @@ var _ = Describe("PyTorchJob controller", func() { }, }, } - + }) + AfterEach(func() { + Expect(testK8sClient.Delete(ctx, job)).Should(Succeed()) + Expect(testK8sClient.Delete(ctx, ns)).Should(Succeed()) + }) + It("Should get the corresponding resources successfully", func() { + By("By creating a new PyTorchJob") Expect(testK8sClient.Create(ctx, job)).Should(Succeed()) - key := types.NamespacedName{Name: name, Namespace: namespace} created := &kubeflowv1.PyTorchJob{} // We'll need to retry getting this newly created PyTorchJob, given that creation may not immediately happen. Eventually(func() bool { - err := testK8sClient.Get(ctx, key, created) + err := testK8sClient.Get(ctx, jobKey, created) return err == nil }, testutil.Timeout, testutil.Interval).Should(BeTrue()) - masterKey := types.NamespacedName{Name: fmt.Sprintf("%s-master-0", name), Namespace: namespace} masterPod := &corev1.Pod{} Eventually(func() bool { err := testK8sClient.Get(ctx, masterKey, masterPod) @@ -150,7 +178,7 @@ var _ = Describe("PyTorchJob controller", func() { masterPod.ResourceVersion = "" Expect(testK8sClient.Status().Update(ctx, masterPod)).Should(Succeed()) Eventually(func() bool { - err := testK8sClient.Get(ctx, key, created) + err := testK8sClient.Get(ctx, jobKey, created) if err != nil { return false } @@ -160,32 +188,279 @@ var _ = Describe("PyTorchJob controller", func() { // Check if the job is succeeded. cond := getCondition(created.Status, kubeflowv1.JobSucceeded) Expect(cond.Status).To(Equal(corev1.ConditionTrue)) - By("Deleting the PyTorchJob") - Expect(testK8sClient.Delete(ctx, job)).Should(Succeed()) + }) + + It("Shouldn't create resources when PyTorchJob is suspended; Should create resources once PyTorchJob is unsuspended", func() { + By("By creating a new PyTorchJob with suspend=true") + job.Spec.RunPolicy.Suspend = pointer.Bool(true) + job.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Replicas = pointer.Int32(1) + Expect(testK8sClient.Create(ctx, job)).Should(Succeed()) + + created := &kubeflowv1.PyTorchJob{} + masterPod := &corev1.Pod{} + workerPod := &corev1.Pod{} + masterSvc := &corev1.Service{} + workerSvc := &corev1.Service{} + + By("Checking created PyTorchJob") + Eventually(func() bool { + err := testK8sClient.Get(ctx, jobKey, created) + return err == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + + By("Checking if the pods and services aren't created") + Consistently(func() bool { + errMaster := testK8sClient.Get(ctx, masterKey, masterPod) + errWorker := testK8sClient.Get(ctx, worker0Key, workerPod) + return errors.IsNotFound(errMaster) && errors.IsNotFound(errWorker) + }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue()) + Consistently(func() bool { + errMaster := testK8sClient.Get(ctx, masterKey, masterSvc) + errWorker := testK8sClient.Get(ctx, worker0Key, workerSvc) + return errors.IsNotFound(errMaster) && errors.IsNotFound(errWorker) + }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue()) + + By("Checking if the PyTorchJob has suspended condition") + Eventually(func() []kubeflowv1.JobCondition { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.Conditions + }, testutil.ConsistentDuration, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition{ + { + Type: kubeflowv1.JobCreated, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobCreatedReason), + Message: fmt.Sprintf("PyTorchJob %s is created.", name), + }, + { + Type: kubeflowv1.JobSuspended, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSuspendedReason), + Message: fmt.Sprintf("PyTorchJob %s is suspended.", name), + }, + }, testutil.IgnoreJobConditionsTimes)) + + By("Unsuspending the PyTorchJob") + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + created.Spec.RunPolicy.Suspend = pointer.Bool(false) + Expect(testK8sClient.Update(ctx, created)).Should(Succeed()) + + By("Check if the pods and services are created") + Eventually(func() error { + return testK8sClient.Get(ctx, masterKey, masterPod) + }, testutil.Timeout, testutil.Interval).Should(BeNil()) + Eventually(func() error { + return testK8sClient.Get(ctx, worker0Key, workerPod) + }, testutil.Timeout, testutil.Interval).Should(BeNil()) + Eventually(func() error { + return testK8sClient.Get(ctx, masterKey, masterSvc) + }, testutil.Timeout, testutil.Interval).Should(BeNil()) + Eventually(func() error { + return testK8sClient.Get(ctx, worker0Key, workerSvc) + }, testutil.Timeout, testutil.Interval).Should(BeNil()) + + By("Updating Pod's condition with running") + masterPod.Status.Phase = corev1.PodRunning + masterPod.ResourceVersion = "" + Expect(testK8sClient.Status().Update(ctx, masterPod)).Should(Succeed()) + workerPod.Status.Phase = corev1.PodRunning + workerPod.ResourceVersion = "" + Expect(testK8sClient.Status().Update(ctx, workerPod)).Should(Succeed()) + + By("Checking the PyTorchJob has resumed conditions") + Eventually(func() []kubeflowv1.JobCondition { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.Conditions + }, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition{ + { + Type: kubeflowv1.JobCreated, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobCreatedReason), + Message: fmt.Sprintf("PyTorchJob %s is created.", name), + }, + { + Type: kubeflowv1.JobSuspended, + Status: corev1.ConditionFalse, + Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobResumedReason), + Message: fmt.Sprintf("PyTorchJob %s is resumed.", name), + }, + { + Type: kubeflowv1.JobRunning, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRunningReason), + Message: fmt.Sprintf("PyTorchJob %s is running.", name), + }, + }, testutil.IgnoreJobConditionsTimes)) + }) + + It("Should delete resources after PyTorchJob is suspended", func() { + By("By creating a new PyTorchJob") + job.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Replicas = pointer.Int32(1) + Expect(testK8sClient.Create(ctx, job)).Should(Succeed()) + + created := &kubeflowv1.PyTorchJob{} + masterPod := &corev1.Pod{} + workerPod := &corev1.Pod{} + masterSvc := &corev1.Service{} + workerSvc := &corev1.Service{} + + // We'll need to retry getting this newly created PyTorchJob, given that creation may not immediately happen. + By("Checking created PyTorchJob") + Eventually(func() bool { + err := testK8sClient.Get(ctx, jobKey, created) + return err == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + Eventually(func() *metav1.Time { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.StartTime + }, testutil.Timeout, testutil.Interval).ShouldNot(BeNil()) + + By("Checking the created pods and services") + Eventually(func() bool { + errMaster := testK8sClient.Get(ctx, masterKey, masterPod) + errWorker := testK8sClient.Get(ctx, worker0Key, workerPod) + return errMaster == nil && errWorker == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + Eventually(func() bool { + errMaster := testK8sClient.Get(ctx, masterKey, masterSvc) + errWorker := testK8sClient.Get(ctx, worker0Key, workerSvc) + return errMaster == nil && errWorker == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + + By("Updating the pod's phase with Running") + masterPod.Status.Phase = corev1.PodRunning + masterPod.ResourceVersion = "" + Expect(testK8sClient.Status().Update(ctx, masterPod)).Should(Succeed()) + workerPod.Status.Phase = corev1.PodRunning + workerPod.ResourceVersion = "" + Expect(testK8sClient.Status().Update(ctx, workerPod)).Should(Succeed()) + + By("Checking the PyTorchJob's condition") + Eventually(func() []kubeflowv1.JobCondition { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.Conditions + }, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition{ + { + Type: kubeflowv1.JobCreated, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobCreatedReason), + Message: fmt.Sprintf("PyTorchJob %s is created.", name), + }, + { + Type: kubeflowv1.JobRunning, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRunningReason), + Message: fmt.Sprintf("PyTorchJob %s is running.", name), + }, + }, testutil.IgnoreJobConditionsTimes)) + + By("Updating the PytorchJob with suspend=true") + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + created.Spec.RunPolicy.Suspend = pointer.Bool(true) + Expect(testK8sClient.Update(ctx, created)).Should(Succeed()) + + By("Checking if the pods and services are removed") + Eventually(func() bool { + errMaster := testK8sClient.Get(ctx, masterKey, masterPod) + errWorker := testK8sClient.Get(ctx, worker0Key, workerPod) + return errors.IsNotFound(errMaster) && errors.IsNotFound(errWorker) + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + Consistently(func() bool { + errMaster := testK8sClient.Get(ctx, masterKey, masterPod) + errWorker := testK8sClient.Get(ctx, worker0Key, workerPod) + return errors.IsNotFound(errMaster) && errors.IsNotFound(errWorker) + }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue()) + Eventually(func() bool { + errMaster := testK8sClient.Get(ctx, masterKey, masterSvc) + errWorker := testK8sClient.Get(ctx, worker0Key, workerSvc) + return errors.IsNotFound(errMaster) && errors.IsNotFound(errWorker) + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + Consistently(func() bool { + errMaster := testK8sClient.Get(ctx, masterKey, masterSvc) + errWorker := testK8sClient.Get(ctx, worker0Key, workerSvc) + return errors.IsNotFound(errMaster) && errors.IsNotFound(errWorker) + }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue()) + + By("Checking if the PyTorchJob has a suspended condition") + Eventually(func() bool { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.ReplicaStatuses[kubeflowv1.PyTorchJobReplicaTypeMaster].Active == 0 && + created.Status.ReplicaStatuses[kubeflowv1.PyTorchJobReplicaTypeWorker].Active == 0 && + created.Status.StartTime == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + Consistently(func() bool { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.ReplicaStatuses[kubeflowv1.PyTorchJobReplicaTypeMaster].Active == 0 && + created.Status.ReplicaStatuses[kubeflowv1.PyTorchJobReplicaTypeWorker].Active == 0 && + created.Status.StartTime == nil + }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue()) + Expect(created.Status.Conditions).Should(BeComparableTo([]kubeflowv1.JobCondition{ + { + Type: kubeflowv1.JobCreated, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobCreatedReason), + Message: fmt.Sprintf("PyTorchJob %s is created.", name), + }, + { + Type: kubeflowv1.JobRunning, + Status: corev1.ConditionFalse, + Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSuspendedReason), + Message: fmt.Sprintf("PyTorchJob %s is suspended.", name), + }, + { + Type: kubeflowv1.JobSuspended, + Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSuspendedReason), + Message: fmt.Sprintf("PyTorchJob %s is suspended.", name), + Status: corev1.ConditionTrue, + }, + }, testutil.IgnoreJobConditionsTimes)) }) }) Context("When creating the elastic PyTorchJob", func() { - // TODO(gaocegege): Test with more than 1 worker. - It("Should get the corresponding resources successfully", func() { - // Define the expected elastic policy. - var ( - backendC10D = kubeflowv1.BackendC10D - minReplicas = pointer.Int32(1) - maxReplicas = pointer.Int32(3) - maxRestarts = pointer.Int32(3) - namespace = "default" - name = "easltic-job" - ) + const name = "easltic-job" + var ( + ctx = context.Background() + ns *corev1.Namespace + job *kubeflowv1.PyTorchJob + jobKey types.NamespacedName + workerKey types.NamespacedName + backendC10D = kubeflowv1.BackendC10D + minReplicas = int32(1) + maxReplicas = int32(3) + maxRestarts = int32(3) + ) + BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "elastic-pytorch-test-", + }, + } + Expect(testK8sClient.Create(ctx, ns)) - By("By creating a new PyTorchJob") - ctx := context.Background() - job := newPyTorchJobForTest(name, namespace) + job = newPyTorchJobForTest(name, ns.Name) + jobKey = client.ObjectKeyFromObject(job) + workerKey = types.NamespacedName{ + Name: fmt.Sprintf("%s-worker-0", name), + Namespace: ns.Name, + } + // Define the expected elastic policy. job.Spec.ElasticPolicy = &kubeflowv1.ElasticPolicy{ RDZVBackend: &backendC10D, - MaxReplicas: maxReplicas, - MinReplicas: minReplicas, - MaxRestarts: maxRestarts, + MinReplicas: &minReplicas, + MaxReplicas: &maxReplicas, + MaxRestarts: &maxRestarts, + Metrics: []autoscalingv2.MetricSpec{ + { + Type: autoscalingv2.ResourceMetricSourceType, + Resource: &autoscalingv2.ResourceMetricSource{ + Name: corev1.ResourceCPU, + Target: autoscalingv2.MetricTarget{ + Type: autoscalingv2.UtilizationMetricType, + AverageValue: resource.NewQuantity(80, resource.DecimalSI), + }, + }, + }, + }, } job.Spec.PyTorchReplicaSpecs = map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec{ kubeflowv1.PyTorchJobReplicaTypeWorker: { @@ -209,19 +484,24 @@ var _ = Describe("PyTorchJob controller", func() { }, }, } - + }) + AfterEach(func() { + Expect(testK8sClient.Delete(ctx, job)).Should(Succeed()) + Expect(testK8sClient.Delete(ctx, ns)).Should(Succeed()) + }) + // TODO(gaocegege): Test with more than 1 worker. + It("Should get the corresponding resources successfully", func() { + By("By creating a new PyTorchJob") Expect(testK8sClient.Create(ctx, job)).Should(Succeed()) - key := types.NamespacedName{Name: name, Namespace: namespace} created := &kubeflowv1.PyTorchJob{} // We'll need to retry getting this newly created PyTorchJob, given that creation may not immediately happen. Eventually(func() bool { - err := testK8sClient.Get(ctx, key, created) + err := testK8sClient.Get(ctx, jobKey, created) return err == nil }, testutil.Timeout, testutil.Interval).Should(BeTrue()) - workerKey := types.NamespacedName{Name: fmt.Sprintf("%s-worker-0", name), Namespace: namespace} pod := &corev1.Pod{} Eventually(func() bool { err := testK8sClient.Get(ctx, workerKey, pod) @@ -234,6 +514,11 @@ var _ = Describe("PyTorchJob controller", func() { return err == nil }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + hpa := &autoscalingv2.HorizontalPodAutoscaler{} + Eventually(func() error { + return testK8sClient.Get(ctx, jobKey, hpa) + }, testutil.Timeout, testutil.Interval).Should(BeNil()) + // Check pod port. Expect(pod.Spec.Containers[0].Ports).To(ContainElement(corev1.ContainerPort{ Name: kubeflowv1.PytorchJobDefaultPortName, @@ -245,13 +530,13 @@ var _ = Describe("PyTorchJob controller", func() { Value: string(backendC10D), }, corev1.EnvVar{ Name: EnvNNodes, - Value: fmt.Sprintf("%d:%d", *minReplicas, *maxReplicas), + Value: fmt.Sprintf("%d:%d", minReplicas, maxReplicas), }, corev1.EnvVar{ Name: EnvRDZVEndpoint, Value: fmt.Sprintf("%s:%d", svc.Name, expectedPort), }, corev1.EnvVar{ Name: EnvMaxRestarts, - Value: fmt.Sprintf("%d", *maxRestarts), + Value: fmt.Sprintf("%d", maxRestarts), })) Expect(svc.Spec.Ports[0].Port).To(Equal(expectedPort)) // Check owner references. @@ -278,7 +563,7 @@ var _ = Describe("PyTorchJob controller", func() { pod.ResourceVersion = "" Expect(testK8sClient.Status().Update(ctx, pod)).Should(Succeed()) Eventually(func() bool { - err := testK8sClient.Get(ctx, key, created) + err := testK8sClient.Get(ctx, jobKey, created) if err != nil { return false } @@ -288,8 +573,31 @@ var _ = Describe("PyTorchJob controller", func() { // Check if the job is succeeded. cond := getCondition(created.Status, kubeflowv1.JobSucceeded) Expect(cond.Status).To(Equal(corev1.ConditionTrue)) - By("Deleting the PyTorchJob") - Expect(testK8sClient.Delete(ctx, job)).Should(Succeed()) + }) + It("Should delete HPA once the PyTorchJob is suspended", func() { + By("By creating a new PyTorchJob") + Expect(testK8sClient.Create(ctx, job)).Should(Succeed()) + + created := &kubeflowv1.PyTorchJob{} + hpa := &autoscalingv2.HorizontalPodAutoscaler{} + + By("Checking if the PyTorchJob and HPA are created") + Eventually(func() error { + return testK8sClient.Get(ctx, jobKey, created) + }, testutil.Timeout, testutil.Interval).Should(BeNil()) + Eventually(func() error { + return testK8sClient.Get(ctx, jobKey, hpa) + }, testutil.Timeout, testutil.Interval).Should(BeNil()) + + By("Suspending PyTorchJob") + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + created.Spec.RunPolicy.Suspend = pointer.Bool(true) + Expect(testK8sClient.Update(ctx, created)).Should(Succeed()) + + By("Checking if the HPA is deleted") + Eventually(func() bool { + return errors.IsNotFound(testK8sClient.Get(ctx, jobKey, hpa)) + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) }) }) }) diff --git a/pkg/controller.v1/tensorflow/job_test.go b/pkg/controller.v1/tensorflow/job_test.go index 2cb44a4abd..c41384c22d 100644 --- a/pkg/controller.v1/tensorflow/job_test.go +++ b/pkg/controller.v1/tensorflow/job_test.go @@ -244,7 +244,8 @@ var _ = Describe("TFJob controller", func() { ctx := context.Background() tc.tfJob.SetName(fmt.Sprintf(jobNameTemplate, idx)) tc.tfJob.SetUID(uuid.NewUUID()) - Expect(commonutil.UpdateJobConditions(&tc.tfJob.Status, kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobSucceededReason), "")).Should(Succeed()) + Expect(commonutil.UpdateJobConditions(&tc.tfJob.Status, kubeflowv1.JobSucceeded, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobSucceededReason), "")).Should(Succeed()) refs := []metav1.OwnerReference{ *reconciler.GenOwnerReference(tc.tfJob), diff --git a/pkg/controller.v1/tensorflow/tfjob_controller.go b/pkg/controller.v1/tensorflow/tfjob_controller.go index e0d602fd7f..0be8067eac 100644 --- a/pkg/controller.v1/tensorflow/tfjob_controller.go +++ b/pkg/controller.v1/tensorflow/tfjob_controller.go @@ -454,8 +454,8 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow if running > 0 { msg := fmt.Sprintf("TFJob %s/%s is running.", tfJob.Namespace, tfJob.Name) - err := commonutil.UpdateJobConditions(jobStatus, - kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobRunningReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobRunningReason), msg) if err != nil { commonutil.LoggerForJob(tfJob).Infof( "Append tfjob condition error: %v", err) @@ -471,7 +471,7 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow jobStatus.CompletionTime = &now } err := commonutil.UpdateJobConditions(jobStatus, - kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobSucceededReason), msg) + kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobSucceededReason), msg) if err != nil { commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) return err @@ -493,7 +493,7 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow jobStatus.CompletionTime = &now } err := commonutil.UpdateJobConditions(jobStatus, - kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobSucceededReason), msg) + kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobSucceededReason), msg) if err != nil { commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) return err @@ -503,7 +503,8 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow // Some workers are still running, leave a running condition. msg := fmt.Sprintf("TFJob %s/%s is running.", tfJob.Namespace, tfJob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobRunningReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobRunningReason), msg) if err != nil { commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) return err @@ -517,7 +518,7 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow // the restarting condition will be removed from jobStatus by kubeflowv1.filterOutCondition(), // so we need to append the restarting condition back to jobStatus. if existingRestartingCondition != nil { - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, existingRestartingCondition.Reason, existingRestartingCondition.Message) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, existingRestartingCondition.Reason, existingRestartingCondition.Message) if err != nil { commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) return err @@ -538,7 +539,8 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobFailedReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobFailedReason), msg) if err != nil { commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) return err @@ -698,7 +700,8 @@ func (r *TFJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { msg := fmt.Sprintf("TFJob %s is created.", e.Object.GetName()) logrus.Info(msg) trainingoperatorcommon.CreatedJobsCounterInc(tfJob.Namespace, r.GetFrameworkName()) - if err := commonutil.UpdateJobConditions(&tfJob.Status, kubeflowv1.JobCreated, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobCreatedReason), msg); err != nil { + if err := commonutil.UpdateJobConditions(&tfJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobCreatedReason), msg); err != nil { log.Log.Error(err, "append job condition error") return false } diff --git a/pkg/controller.v1/xgboost/status.go b/pkg/controller.v1/xgboost/status.go index 4cae967e81..1377aab251 100644 --- a/pkg/controller.v1/xgboost/status.go +++ b/pkg/controller.v1/xgboost/status.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" commonutil "github.com/kubeflow/training-operator/pkg/util" @@ -12,7 +13,7 @@ import ( func setRunningCondition(logger *logrus.Entry, jobName string, jobStatus *kubeflowv1.JobStatus) error { msg := fmt.Sprintf("XGBoostJob %s is running.", jobName) if condition := findStatusCondition(jobStatus.Conditions, kubeflowv1.JobRunning); condition == nil { - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRunningReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRunningReason), msg) if err != nil { logger.Infof("Append job condition error: %v", err) return err diff --git a/pkg/controller.v1/xgboost/xgboostjob_controller.go b/pkg/controller.v1/xgboost/xgboostjob_controller.go index 1949346245..48b724dc6c 100644 --- a/pkg/controller.v1/xgboost/xgboostjob_controller.go +++ b/pkg/controller.v1/xgboost/xgboostjob_controller.go @@ -393,7 +393,7 @@ func (r *XGBoostJobReconciler) UpdateJobStatus(job interface{}, replicas map[kub now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobSucceededReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobSucceededReason), msg) if err != nil { logger.Infof("Append job condition error: %v", err) return err @@ -409,7 +409,7 @@ func (r *XGBoostJobReconciler) UpdateJobStatus(job interface{}, replicas map[kub if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode { msg := fmt.Sprintf("XGBoostJob %s is restarting because %d %s replica(s) failed.", xgboostJob.Name, failed, rtype) r.Recorder.Event(xgboostJob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRestartingReason), msg) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRestartingReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRestartingReason), msg) if err != nil { logger.Infof("Append job condition error: %v", err) return err @@ -422,7 +422,7 @@ func (r *XGBoostJobReconciler) UpdateJobStatus(job interface{}, replicas map[kub now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobFailedReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobFailedReason), msg) if err != nil { logger.Infof("Append job condition error: %v", err) return err @@ -490,7 +490,8 @@ func (r *XGBoostJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool msg := fmt.Sprintf("xgboostJob %s is created.", e.Object.GetName()) logrus.Info(msg) trainingoperatorcommon.CreatedJobsCounterInc(xgboostJob.Namespace, r.GetFrameworkName()) - if err := commonutil.UpdateJobConditions(&xgboostJob.Status, kubeflowv1.JobCreated, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobCreatedReason), msg); err != nil { + if err := commonutil.UpdateJobConditions(&xgboostJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobCreatedReason), msg); err != nil { log.Log.Error(err, "append job condition error") return false } diff --git a/pkg/reconciler.v1/common/job.go b/pkg/reconciler.v1/common/job.go index 07ba630b8c..353a9d626f 100644 --- a/pkg/reconciler.v1/common/job.go +++ b/pkg/reconciler.v1/common/job.go @@ -214,7 +214,7 @@ func (r *JobReconciler) ReconcileJob( r.GetRecorder().Event(job, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage) - if err = commonutil.UpdateJobConditions(status, kubeflowv1.JobFailed, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage); err != nil { + if err = commonutil.UpdateJobConditions(status, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage); err != nil { logrus.Infof(ErrAppendJobConditionTemplate, err) return err } @@ -307,7 +307,8 @@ func (r *JobReconciler) UpdateJobStatus( if r.IsFlagReplicaTypeForJobStatus(string(rtype)) { if running > 0 { msg := fmt.Sprintf("%s %s is running.", jobKind, jobNamespacedName) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(jobKind, commonutil.JobRunningReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, + commonutil.NewReason(jobKind, commonutil.JobRunningReason), msg) if err != nil { logger.Info(ErrAppendJobConditionTemplate, err) return err @@ -322,7 +323,8 @@ func (r *JobReconciler) UpdateJobStatus( now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.NewReason(jobKind, commonutil.JobSucceededReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, + commonutil.NewReason(jobKind, commonutil.JobSucceededReason), msg) if err != nil { logger.Info(ErrAppendJobConditionTemplate, err) } @@ -335,7 +337,8 @@ func (r *JobReconciler) UpdateJobStatus( msg := fmt.Sprintf("%s %s is restarting because %d %s replica(s) failed.", jobKind, jobNamespacedName, failed, rtype) r.GetRecorder().Event(job, corev1.EventTypeWarning, commonutil.NewReason(jobKind, commonutil.JobRestartingReason), msg) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.NewReason(jobKind, commonutil.JobRestartingReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, + commonutil.NewReason(jobKind, commonutil.JobRestartingReason), msg) if err != nil { logger.Info(ErrAppendJobConditionTemplate, err) return err @@ -347,7 +350,8 @@ func (r *JobReconciler) UpdateJobStatus( now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.NewReason(jobKind, commonutil.JobFailedReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, + commonutil.NewReason(jobKind, commonutil.JobFailedReason), msg) if err != nil { logger.Info(ErrAppendJobConditionTemplate, err) return err @@ -360,7 +364,8 @@ func (r *JobReconciler) UpdateJobStatus( msg := fmt.Sprintf("%s %s is running.", jobKind, jobNamespacedName) logger.Info(msg) - if err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(jobKind, commonutil.JobRunningReason), msg); err != nil { + if err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, + commonutil.NewReason(jobKind, commonutil.JobRunningReason), msg); err != nil { logger.Error(err, ErrUpdateJobConditionsFailed, jobKind) return err } diff --git a/pkg/util/status.go b/pkg/util/status.go index e268a33a8f..501d8d08b0 100644 --- a/pkg/util/status.go +++ b/pkg/util/status.go @@ -22,6 +22,10 @@ const ( JobRestartingReason = "Restarting" // JobFailedValidationReason is added in a job when it failed validation JobFailedValidationReason = "FailedValidation" + // JobSuspendedReason is added in a job when it is suspended. + JobSuspendedReason = "Suspended" + // JobResumedReason is added in a job when it is unsuspended. + JobResumedReason = "Resumed" ) func NewReason(kind, reason string) string { @@ -38,9 +42,22 @@ func IsFailed(status apiv1.JobStatus) bool { return hasCondition(status, apiv1.JobFailed) } +func IsRunning(status apiv1.JobStatus) bool { + return hasCondition(status, apiv1.JobRunning) +} + +func IsSuspend(status apiv1.JobStatus) bool { + return hasCondition(status, apiv1.JobSuspended) +} + // UpdateJobConditions adds to the jobStatus a new condition if needed, with the conditionType, reason, and message -func UpdateJobConditions(jobStatus *apiv1.JobStatus, conditionType apiv1.JobConditionType, reason, message string) error { - condition := newCondition(conditionType, reason, message) +func UpdateJobConditions( + jobStatus *apiv1.JobStatus, + conditionType apiv1.JobConditionType, + conditionStatus v1.ConditionStatus, + reason, message string, +) error { + condition := newCondition(conditionType, conditionStatus, reason, message) setCondition(jobStatus, condition) return nil } @@ -55,10 +72,10 @@ func hasCondition(status apiv1.JobStatus, condType apiv1.JobConditionType) bool } // newCondition creates a new job condition. -func newCondition(conditionType apiv1.JobConditionType, reason, message string) apiv1.JobCondition { +func newCondition(conditionType apiv1.JobConditionType, conditionStatus v1.ConditionStatus, reason, message string) apiv1.JobCondition { return apiv1.JobCondition{ Type: conditionType, - Status: v1.ConditionTrue, + Status: conditionStatus, LastUpdateTime: metav1.Now(), LastTransitionTime: metav1.Now(), Reason: reason, diff --git a/pkg/util/status_test.go b/pkg/util/status_test.go index 62b8cb4571..7d6f4b634a 100644 --- a/pkg/util/status_test.go +++ b/pkg/util/status_test.go @@ -32,13 +32,37 @@ func TestIsFailed(t *testing.T) { assert.True(t, IsFailed(jobStatus)) } +func TestIsRunning(t *testing.T) { + jobStatus := apiv1.JobStatus{ + Conditions: []apiv1.JobCondition{ + { + Type: apiv1.JobRunning, + Status: corev1.ConditionTrue, + }, + }, + } + assert.True(t, IsRunning(jobStatus)) +} + +func TestIsSuspend(t *testing.T) { + jobStatus := apiv1.JobStatus{ + Conditions: []apiv1.JobCondition{ + { + Type: apiv1.JobSuspended, + Status: corev1.ConditionTrue, + }, + }, + } + assert.True(t, IsSuspend(jobStatus)) +} + func TestUpdateJobConditions(t *testing.T) { jobStatus := apiv1.JobStatus{} conditionType := apiv1.JobCreated reason := "Job Created" message := "Job Created" - err := UpdateJobConditions(&jobStatus, conditionType, reason, message) + err := UpdateJobConditions(&jobStatus, conditionType, corev1.ConditionTrue, reason, message) if assert.NoError(t, err) { // Check JobCreated condition is appended conditionInStatus := jobStatus.Conditions[0] @@ -50,7 +74,7 @@ func TestUpdateJobConditions(t *testing.T) { conditionType = apiv1.JobRunning reason = "Job Running" message = "Job Running" - err = UpdateJobConditions(&jobStatus, conditionType, reason, message) + err = UpdateJobConditions(&jobStatus, conditionType, corev1.ConditionTrue, reason, message) if assert.NoError(t, err) { // Check JobRunning condition is appended conditionInStatus := jobStatus.Conditions[1] @@ -62,7 +86,7 @@ func TestUpdateJobConditions(t *testing.T) { conditionType = apiv1.JobRestarting reason = "Job Restarting" message = "Job Restarting" - err = UpdateJobConditions(&jobStatus, conditionType, reason, message) + err = UpdateJobConditions(&jobStatus, conditionType, corev1.ConditionTrue, reason, message) if assert.NoError(t, err) { // Check JobRunning condition is filtered out and JobRestarting state is appended conditionInStatus := jobStatus.Conditions[1] @@ -74,7 +98,7 @@ func TestUpdateJobConditions(t *testing.T) { conditionType = apiv1.JobRunning reason = "Job Running" message = "Job Running" - err = UpdateJobConditions(&jobStatus, conditionType, reason, message) + err = UpdateJobConditions(&jobStatus, conditionType, corev1.ConditionTrue, reason, message) if assert.NoError(t, err) { // Again, Check JobRestarting condition is filtered and JobRestarting is appended conditionInStatus := jobStatus.Conditions[1] @@ -86,7 +110,7 @@ func TestUpdateJobConditions(t *testing.T) { conditionType = apiv1.JobFailed reason = "Job Failed" message = "Job Failed" - err = UpdateJobConditions(&jobStatus, conditionType, reason, message) + err = UpdateJobConditions(&jobStatus, conditionType, corev1.ConditionTrue, reason, message) if assert.NoError(t, err) { // Check JobRunning condition is set to false jobRunningCondition := jobStatus.Conditions[1] diff --git a/pkg/util/testutil/constants.go b/pkg/util/testutil/constants.go index f935731fcf..74e0a11796 100644 --- a/pkg/util/testutil/constants.go +++ b/pkg/util/testutil/constants.go @@ -1,8 +1,19 @@ package testutil -import "time" +import ( + "time" + + "github.com/google/go-cmp/cmp/cmpopts" + + kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" +) const ( - Timeout = 30 * time.Second - Interval = 250 * time.Millisecond + Timeout = 30 * time.Second + Interval = 250 * time.Millisecond + ConsistentDuration = 3 * time.Second +) + +var ( + IgnoreJobConditionsTimes = cmpopts.IgnoreFields(kubeflowv1.JobCondition{}, "LastUpdateTime", "LastTransitionTime") ) diff --git a/pkg/util/train/train_util.go b/pkg/util/train/train_util.go index fbb120a8cb..cb5295c2c6 100644 --- a/pkg/util/train/train_util.go +++ b/pkg/util/train/train_util.go @@ -15,6 +15,16 @@ // Package that various helper routines for training. package train +import ( + "k8s.io/utils/pointer" + + kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" +) + func IsRetryableExitCode(exitCode int32) bool { return exitCode >= 128 } + +func IsJobSuspended(runPolicy *kubeflowv1.RunPolicy) bool { + return runPolicy != nil && pointer.BoolDeref(runPolicy.Suspend, false) +} diff --git a/pkg/util/train/train_util_test.go b/pkg/util/train/train_util_test.go index 3a95ee554f..e6bd292ecf 100644 --- a/pkg/util/train/train_util_test.go +++ b/pkg/util/train/train_util_test.go @@ -14,7 +14,13 @@ package train -import "testing" +import ( + "testing" + + "k8s.io/utils/pointer" + + kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" +) func TestIsRetryableExitCode(t *testing.T) { tcs := []struct { @@ -50,3 +56,41 @@ func TestIsRetryableExitCode(t *testing.T) { } } } + +func TestIsJobSuspended(t *testing.T) { + cases := map[string]struct { + runPolicy *kubeflowv1.RunPolicy + want bool + }{ + "runPolicy is nil": { + runPolicy: nil, + want: false, + }, + "suspend is nil": { + runPolicy: &kubeflowv1.RunPolicy{ + Suspend: nil, + }, + want: false, + }, + "suspend is false": { + runPolicy: &kubeflowv1.RunPolicy{ + Suspend: pointer.Bool(false), + }, + want: false, + }, + "suspend is true": { + runPolicy: &kubeflowv1.RunPolicy{ + Suspend: pointer.Bool(true), + }, + want: true, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + got := IsJobSuspended(tc.runPolicy) + if tc.want != got { + t.Errorf("Unexpected suspended from IsJobSuspended \nwant: %v\n, \ngot: %v\n", tc.want, got) + } + }) + } +}