From 9e084ff0b0904b82312225c4baca295baf482b1e Mon Sep 17 00:00:00 2001 From: Yuki Iwai Date: Tue, 11 Jul 2023 03:18:54 +0900 Subject: [PATCH] Use the same reasons for Condition and Event (#1854) --- pkg/controller.v1/common/job.go | 5 ++- pkg/controller.v1/common/pod.go | 6 ++- pkg/controller.v1/mpi/mpijob.go | 10 ----- pkg/controller.v1/mpi/mpijob_controller.go | 38 ++++++++++--------- pkg/controller.v1/mxnet/mxjob_controller.go | 28 +++++--------- .../paddlepaddle/paddlepaddle_controller.go | 25 ++++++------ .../pytorch/pytorchjob_controller.go | 25 ++++++------ pkg/controller.v1/tensorflow/job_test.go | 2 +- .../tensorflow/tfjob_controller.go | 29 ++++++-------- .../tensorflow/tfjob_controller_test.go | 7 ++-- pkg/controller.v1/xgboost/status.go | 2 +- pkg/controller.v1/xgboost/status_test.go | 15 ++++---- .../xgboost/xgboostjob_controller.go | 27 +++++-------- pkg/reconciler.v1/common/job.go | 19 +++++----- pkg/util/status.go | 24 +++++++----- 15 files changed, 122 insertions(+), 140 deletions(-) diff --git a/pkg/controller.v1/common/job.go b/pkg/controller.v1/common/job.go index 4805c260bc..a218cfb3be 100644 --- a/pkg/controller.v1/common/job.go +++ b/pkg/controller.v1/common/job.go @@ -93,6 +93,7 @@ func (jc *JobController) ReconcileJobs( utilruntime.HandleError(fmt.Errorf("couldn't get key for job object %#v: %v", job, err)) return err } + jobKind := jc.Controller.GetAPIGroupVersionKind().Kind // Reset expectations // 1. Since `ReconcileJobs` is called, we expect that previous expectations are all satisfied, // and it's safe to reset the expectations @@ -222,9 +223,9 @@ func (jc *JobController) ReconcileJobs( } } - jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.JobFailedReason, failureMessage) + jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage) - if err := commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, commonutil.JobFailedReason, failureMessage); err != nil { + if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage); err != nil { log.Infof("Append job condition error: %v", err) return err } diff --git a/pkg/controller.v1/common/pod.go b/pkg/controller.v1/common/pod.go index 0d975c52c4..7ade04d81b 100644 --- a/pkg/controller.v1/common/pod.go +++ b/pkg/controller.v1/common/pod.go @@ -359,8 +359,10 @@ func (jc *JobController) ReconcilePods( msg := fmt.Sprintf("job %s is restarting because %s replica(s) failed.", metaObject.GetName(), rType) - jc.Recorder.Event(runtimeObject, v1.EventTypeWarning, "JobRestarting", msg) - if err := commonutil.UpdateJobConditions(jobStatus, apiv1.JobRestarting, "JobRestarting", msg); err != nil { + jc.Recorder.Event(runtimeObject, v1.EventTypeWarning, + commonutil.NewReason(jc.Controller.GetAPIGroupVersionKind().Kind, commonutil.JobRestartingReason), msg) + if err := commonutil.UpdateJobConditions(jobStatus, apiv1.JobRestarting, + commonutil.NewReason(jc.Controller.GetAPIGroupVersionKind().Kind, commonutil.JobRestartingReason), msg); err != nil { commonutil.LoggerForJob(metaObject).Infof("Append job condition error: %v", err) return err } diff --git a/pkg/controller.v1/mpi/mpijob.go b/pkg/controller.v1/mpi/mpijob.go index 86d67bf6d5..accb6788ac 100644 --- a/pkg/controller.v1/mpi/mpijob.go +++ b/pkg/controller.v1/mpi/mpijob.go @@ -72,17 +72,7 @@ const ( // podTemplateSchedulerNameReason is the warning reason when other scheduler name is set // in pod templates with gang-scheduling enabled podTemplateSchedulerNameReason = "SettedPodTemplateSchedulerName" -) -const ( - // mpiJobCreatedReason is added in a mpijob when it is created. - mpiJobCreatedReason = "MPIJobCreated" - // mpiJobSucceededReason is added in a mpijob when it is succeeded. - mpiJobSucceededReason = "MPIJobSucceeded" - // mpiJobRunningReason is added in a mpijob when it is running. - mpiJobRunningReason = "MPIJobRunning" - // mpiJobFailedReason is added in a mpijob when it is failed. - mpiJobFailedReason = "MPIJobFailed" // mpiJobEvict mpiJobEvict = "MPIJobEvicted" ) diff --git a/pkg/controller.v1/mpi/mpijob_controller.go b/pkg/controller.v1/mpi/mpijob_controller.go index e88479f874..9a520d55e2 100644 --- a/pkg/controller.v1/mpi/mpijob_controller.go +++ b/pkg/controller.v1/mpi/mpijob_controller.go @@ -137,7 +137,8 @@ func (jc *MPIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct if err = kubeflowv1.ValidateV1MpiJobSpec(&mpijob.Spec); err != nil { logger.Error(err, "MPIJob failed validation") - jc.Recorder.Eventf(mpijob, corev1.EventTypeWarning, commonutil.JobFailedValidationReason, "MPIJob failed validation because %s", err) + jc.Recorder.Eventf(mpijob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedValidationReason), + "MPIJob failed validation because %s", err) return ctrl.Result{}, err } @@ -319,7 +320,8 @@ func (jc *MPIJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, e.Object.GetName()) logrus.Info(msg) trainingoperatorcommon.CreatedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName()) - if err := commonutil.UpdateJobConditions(&mpiJob.Status, kubeflowv1.JobCreated, mpiJobCreatedReason, msg); err != nil { + if err := commonutil.UpdateJobConditions(&mpiJob.Status, kubeflowv1.JobCreated, + commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobCreatedReason), msg); err != nil { log.Log.Error(err, "append job condition error") return false } @@ -395,10 +397,10 @@ func (jc *MPIJobReconciler) ReconcilePods( if launcher == nil { launcher, err = jc.KubeClientSet.CoreV1().Pods(mpiJob.Namespace).Create(context.Background(), jc.newLauncher(mpiJob, ctlrconfig.Config.MPIKubectlDeliveryImage, isGPULauncher), metav1.CreateOptions{}) if err != nil { - jc.Recorder.Eventf(mpiJob, corev1.EventTypeWarning, mpiJobFailedReason, "launcher pod created failed: %v", err) + jc.Recorder.Eventf(mpiJob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedReason), "launcher pod created failed: %v", err) return err } else { - jc.Recorder.Eventf(mpiJob, corev1.EventTypeNormal, mpiJobRunningReason, "launcher pod created success: %v", launcher.Name) + jc.Recorder.Eventf(mpiJob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRunningReason), "launcher pod created success: %v", launcher.Name) } } } @@ -418,12 +420,12 @@ func (jc *MPIJobReconciler) updateMPIJobStatus(mpiJob *kubeflowv1.MPIJob, launch if isPodSucceeded(launcher) { mpiJob.Status.ReplicaStatuses[kubeflowv1.MPIJobReplicaTypeLauncher].Succeeded = 1 msg := fmt.Sprintf("MPIJob %s/%s successfully completed.", mpiJob.Namespace, mpiJob.Name) - jc.Recorder.Event(mpiJob, corev1.EventTypeNormal, mpiJobSucceededReason, msg) + jc.Recorder.Event(mpiJob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.MPIJobPlural, commonutil.JobSucceededReason), msg) if mpiJob.Status.CompletionTime == nil { now := metav1.Now() mpiJob.Status.CompletionTime = &now } - err := updateMPIJobConditions(mpiJob, kubeflowv1.JobSucceeded, mpiJobSucceededReason, msg) + err := updateMPIJobConditions(mpiJob, kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSucceededReason), msg) if err != nil { return err } @@ -432,7 +434,7 @@ func (jc *MPIJobReconciler) updateMPIJobStatus(mpiJob *kubeflowv1.MPIJob, launch msg := fmt.Sprintf("MPIJob %s/%s has failed", mpiJob.Namespace, mpiJob.Name) reason := launcher.Status.Reason if reason == "" { - reason = mpiJobFailedReason + reason = commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedReason) } jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, reason, msg) if reason == "Evicted" { @@ -482,11 +484,11 @@ func (jc *MPIJobReconciler) updateMPIJobStatus(mpiJob *kubeflowv1.MPIJob, launch if launcher != nil && launcher.Status.Phase == corev1.PodRunning && running == len(worker) { msg := fmt.Sprintf("MPIJob %s/%s is running.", mpiJob.Namespace, mpiJob.Name) - err := updateMPIJobConditions(mpiJob, kubeflowv1.JobRunning, mpiJobRunningReason, msg) + err := updateMPIJobConditions(mpiJob, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRunningReason), msg) if err != nil { return err } - jc.Recorder.Eventf(mpiJob, corev1.EventTypeNormal, "MPIJobRunning", "MPIJob %s/%s is running", mpiJob.Namespace, mpiJob.Name) + jc.Recorder.Eventf(mpiJob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRunningReason), "MPIJob %s/%s is running", mpiJob.Namespace, mpiJob.Name) } return nil } @@ -581,7 +583,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl if rtype == kubeflowv1.MPIJobReplicaTypeLauncher { if running > 0 { msg := fmt.Sprintf("MPIJob %s is running.", mpiJob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.JobRunningReason, msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRunningReason), msg) if err != nil { commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err) return err @@ -591,12 +593,12 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl if expected == 0 { msg := fmt.Sprintf("MPIJob %s is successfully completed.", mpiJob.Name) logrus.Info(msg) - jc.Recorder.Event(mpiJob, corev1.EventTypeNormal, commonutil.JobSucceededReason, msg) + jc.Recorder.Event(mpiJob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSucceededReason), msg) if jobStatus.CompletionTime == nil { now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.JobSucceededReason, msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSucceededReason), msg) if err != nil { commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err) return err @@ -608,8 +610,8 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl if failed > 0 { if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode { msg := fmt.Sprintf("MPIJob %s is restarting because %d %s replica(s) failed.", mpiJob.Name, failed, rtype) - jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, commonutil.JobRestartingReason, msg) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.JobRestartingReason, msg) + jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRestartingReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRestartingReason), msg) if err != nil { commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err) return err @@ -617,12 +619,13 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl trainingoperatorcommon.RestartedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName()) } else { msg := fmt.Sprintf("MPIJob %s is failed because %d %s replica(s) failed.", mpiJob.Name, failed, rtype) - jc.Recorder.Event(mpiJob, corev1.EventTypeNormal, commonutil.JobFailedReason, msg) + jc.Recorder.Event(mpiJob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedReason), msg) if jobStatus.CompletionTime == nil { now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.JobFailedReason, msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, + commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedReason)), msg) if err != nil { commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err) return err @@ -917,7 +920,8 @@ func (jc *MPIJobReconciler) getOrCreateWorker(mpiJob *kubeflowv1.MPIJob) ([]*cor // can attempt processing again later. This could have been caused by a // temporary network failure, or any other transient reason. if err != nil && !errors.IsNotFound(err) { - jc.Recorder.Eventf(mpiJob, corev1.EventTypeWarning, mpiJobFailedReason, "worker pod created failed: %v", err) + jc.Recorder.Eventf(mpiJob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedReason), + "worker pod created failed: %v", err) return nil, err } // If the worker is not controlled by this MPIJob resource, we should log diff --git a/pkg/controller.v1/mxnet/mxjob_controller.go b/pkg/controller.v1/mxnet/mxjob_controller.go index 6836101d29..680ef313b0 100644 --- a/pkg/controller.v1/mxnet/mxjob_controller.go +++ b/pkg/controller.v1/mxnet/mxjob_controller.go @@ -56,15 +56,6 @@ import ( const ( controllerName = "mxjob-controller" - - // mxJobSucceededReason is added in a mxjob when it is succeeded. - mxJobSucceededReason = "MXJobSucceeded" - // mxJobRunningReason is added in a mxjob when it is running. - mxJobRunningReason = "MXJobRunning" - // mxJobFailedReason is added in a mxjob when it is failed. - mxJobFailedReason = "MXJobFailed" - // mxJobRestarting is added in a mxjob when it is restarting. - mxJobRestartingReason = "MXJobRestarting" ) // NewReconciler creates a MXJob Reconciler @@ -133,7 +124,8 @@ func (r *MXJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl if err = kubeflowv1.ValidateV1MXJob(mxjob); err != nil { logger.Error(err, "MXJob failed validation") - r.Recorder.Eventf(mxjob, corev1.EventTypeWarning, commonutil.JobFailedValidationReason, "MXJob failed validation because %s", err) + r.Recorder.Eventf(mxjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedValidationReason), + "MXJob failed validation because %s", err) return ctrl.Result{}, err } @@ -379,7 +371,7 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow if rtype == kubeflowv1.MXJobReplicaTypeScheduler || singleTraining { if running > 0 { msg := fmt.Sprintf("MXJob %s is running.", mxjob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, mxJobRunningReason, msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRunningReason), msg) if err != nil { logrus.Infof("Append mxjob condition error: %v", err) return err @@ -388,12 +380,12 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow // when scheduler is succeeded, the job is finished. if expected == 0 { msg := fmt.Sprintf("MXJob %s is successfully completed.", mxjob.Name) - r.Recorder.Event(mxjob, corev1.EventTypeNormal, mxJobSucceededReason, msg) + r.Recorder.Event(mxjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobSucceededReason), msg) if jobStatus.CompletionTime == nil { now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, mxJobSucceededReason, msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobSucceededReason), msg) if err != nil { logrus.Infof("Append mxjob condition error: %v", err) return err @@ -405,8 +397,8 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow if failed > 0 { if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode { msg := fmt.Sprintf("mxjob %s is restarting because %d %s replica(s) failed.", mxjob.Name, failed, rtype) - r.Recorder.Event(mxjob, corev1.EventTypeWarning, mxJobRestartingReason, msg) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, mxJobRestartingReason, msg) + r.Recorder.Event(mxjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRestartingReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRestartingReason), msg) if err != nil { logrus.Infof("Append job condition error: %v", err) return err @@ -414,12 +406,12 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow trainingoperatorcommon.RestartedJobsCounterInc(mxjob.Namespace, r.GetFrameworkName()) } else { msg := fmt.Sprintf("mxjob %s is failed because %d %s replica(s) failed.", mxjob.Name, failed, rtype) - r.Recorder.Event(mxjob, corev1.EventTypeNormal, mxJobFailedReason, msg) + r.Recorder.Event(mxjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobFailedReason), msg) if jobStatus.CompletionTime == nil { now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, mxJobFailedReason, msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobFailedReason), msg) if err != nil { logrus.Infof("Append job condition error: %v", err) return err @@ -486,7 +478,7 @@ func (r *MXJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { msg := fmt.Sprintf("MXJob %s is created.", e.Object.GetName()) logrus.Info(msg) trainingoperatorcommon.CreatedJobsCounterInc(mxJob.Namespace, r.GetFrameworkName()) - if err := commonutil.UpdateJobConditions(&mxJob.Status, kubeflowv1.JobCreated, "MXJobCreated", msg); err != nil { + if err := commonutil.UpdateJobConditions(&mxJob.Status, kubeflowv1.JobCreated, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobCreatedReason), msg); err != nil { logrus.Error(err, "append job condition error") return false } diff --git a/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go b/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go index 69e35f1ed9..e94c31b38d 100644 --- a/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go +++ b/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go @@ -133,7 +133,8 @@ func (r *PaddleJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( if err = kubeflowv1.ValidateV1PaddleJob(paddlejob); err != nil { logger.Error(err, "PaddleJob failed validation") - r.Recorder.Eventf(paddlejob, corev1.EventTypeWarning, commonutil.JobFailedValidationReason, "PaddleJob failed validation because %s", err) + r.Recorder.Eventf(paddlejob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobFailedValidationReason), + "PaddleJob failed validation because %s", err) return ctrl.Result{}, err } @@ -392,7 +393,7 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, if rtype == kubeflowv1.PaddleJobReplicaTypeMaster { if running > 0 { msg := fmt.Sprintf("PaddleJob %s is running.", paddlejob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.JobRunningReason, msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRunningReason), msg) if err != nil { commonutil.LoggerForJob(paddlejob).Infof("Append job condition error: %v", err) return err @@ -402,12 +403,12 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, if expected == 0 { msg := fmt.Sprintf("PaddleJob %s is successfully completed.", paddlejob.Name) logrus.Info(msg) - r.Recorder.Event(paddlejob, corev1.EventTypeNormal, commonutil.JobSucceededReason, msg) + r.Recorder.Event(paddlejob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobSucceededReason), msg) if jobStatus.CompletionTime == nil { now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.JobSucceededReason, msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobSucceededReason), msg) if err != nil { commonutil.LoggerForJob(paddlejob).Infof("Append job condition error: %v", err) return err @@ -422,13 +423,13 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, if expected == 0 { msg := fmt.Sprintf("PaddleJob %s/%s successfully completed.", paddlejob.Namespace, paddlejob.Name) - r.recorder.Event(paddlejob, corev1.EventTypeNormal, commonutil.JobSucceededReason, msg) + r.recorder.Event(paddlejob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobSucceededReason), msg) if jobStatus.CompletionTime == nil { now := metav1.Now() jobStatus.CompletionTime = &now } err := commonutil.UpdateJobConditions(jobStatus, - kubeflowv1.JobSucceeded, commonutil.JobSucceededReason, msg) + kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobSucceededReason), msg) if err != nil { commonutil.LoggerForJob(paddlejob).Infof("Append paddlejob condition error: %v", err) return err @@ -438,7 +439,7 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, // Some workers are still running, leave a running condition. msg := fmt.Sprintf("PaddleJob %s/%s is running.", paddlejob.Namespace, paddlejob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.JobRunningReason, msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRunningReason), msg) if err != nil { commonutil.LoggerForJob(paddlejob).Infof("Append paddlejob condition error: %v", err) return err @@ -450,8 +451,8 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, if failed > 0 && (specReplicas > succeeded+running) { if spec.RestartPolicy != kubeflowv1.RestartPolicyNever { msg := fmt.Sprintf("PaddleJob %s is restarting because %d %s replica(s) failed.", paddlejob.Name, failed, rtype) - r.Recorder.Event(paddlejob, corev1.EventTypeWarning, commonutil.JobRestartingReason, msg) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.JobRestartingReason, msg) + r.Recorder.Event(paddlejob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRestartingReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRestartingReason), msg) if err != nil { commonutil.LoggerForJob(paddlejob).Infof("Append job condition error: %v", err) return err @@ -459,12 +460,12 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, trainingoperatorcommon.RestartedJobsCounterInc(paddlejob.Namespace, r.GetFrameworkName()) } else { msg := fmt.Sprintf("PaddleJob %s is failed because %d %s replica(s) failed.", paddlejob.Name, failed, rtype) - r.Recorder.Event(paddlejob, corev1.EventTypeNormal, commonutil.JobFailedReason, msg) + r.Recorder.Event(paddlejob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobFailedReason), msg) if jobStatus.CompletionTime == nil { now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.JobFailedReason, msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobFailedReason), msg) if err != nil { commonutil.LoggerForJob(paddlejob).Infof("Append job condition error: %v", err) return err @@ -549,7 +550,7 @@ func (r *PaddleJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { msg := fmt.Sprintf("PaddleJob %s is created.", e.Object.GetName()) logrus.Info(msg) trainingoperatorcommon.CreatedJobsCounterInc(paddlejob.Namespace, r.GetFrameworkName()) - if err := commonutil.UpdateJobConditions(&paddlejob.Status, kubeflowv1.JobCreated, "PaddleJobCreated", msg); err != nil { + if err := commonutil.UpdateJobConditions(&paddlejob.Status, kubeflowv1.JobCreated, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobCreatedReason), msg); err != nil { logrus.Error(err, "append job condition error") return false } diff --git a/pkg/controller.v1/pytorch/pytorchjob_controller.go b/pkg/controller.v1/pytorch/pytorchjob_controller.go index 7ee3b6f3b9..6344757cc2 100644 --- a/pkg/controller.v1/pytorch/pytorchjob_controller.go +++ b/pkg/controller.v1/pytorch/pytorchjob_controller.go @@ -134,7 +134,8 @@ func (r *PyTorchJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) if err = kubeflowv1.ValidateV1PyTorchJob(pytorchjob); err != nil { logger.Error(err, "PyTorchJob failed validation") - r.Recorder.Eventf(pytorchjob, corev1.EventTypeWarning, commonutil.JobFailedValidationReason, "PyTorchJob failed validation because %s", err) + r.Recorder.Eventf(pytorchjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobFailedValidationReason), + "PyTorchJob failed validation because %s", err) return ctrl.Result{}, err } @@ -391,7 +392,7 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, if rtype == kubeflowv1.PyTorchJobReplicaTypeMaster { if running > 0 { msg := fmt.Sprintf("PyTorchJob %s is running.", pytorchjob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.JobRunningReason, msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRunningReason), msg) if err != nil { commonutil.LoggerForJob(pytorchjob).Infof("Append job condition error: %v", err) return err @@ -401,12 +402,12 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, if expected == 0 { msg := fmt.Sprintf("PyTorchJob %s is successfully completed.", pytorchjob.Name) logrus.Info(msg) - r.Recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.JobSucceededReason, msg) + r.Recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg) if jobStatus.CompletionTime == nil { now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.JobSucceededReason, msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg) if err != nil { commonutil.LoggerForJob(pytorchjob).Infof("Append job condition error: %v", err) return err @@ -424,13 +425,13 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, if expected == 0 || (pytorchjob.Spec.ElasticPolicy != nil && succeeded > 0) { msg := fmt.Sprintf("PyTorchJob %s/%s successfully completed.", pytorchjob.Namespace, pytorchjob.Name) - r.recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.JobSucceededReason, msg) + r.recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg) if jobStatus.CompletionTime == nil { now := metav1.Now() jobStatus.CompletionTime = &now } err := commonutil.UpdateJobConditions(jobStatus, - kubeflowv1.JobSucceeded, commonutil.JobSucceededReason, msg) + kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg) if err != nil { commonutil.LoggerForJob(pytorchjob).Infof("Append pytorchjob condition error: %v", err) return err @@ -440,7 +441,7 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, // Some workers are still running, leave a running condition. msg := fmt.Sprintf("PyTorchJob %s/%s is running.", pytorchjob.Namespace, pytorchjob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.JobRunningReason, msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRunningReason), msg) if err != nil { commonutil.LoggerForJob(pytorchjob).Infof("Append pytorchjob condition error: %v", err) return err @@ -452,8 +453,8 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, if failed > 0 && (specReplicas > succeeded+running) { if spec.RestartPolicy != kubeflowv1.RestartPolicyNever { msg := fmt.Sprintf("PyTorchJob %s is restarting because %d %s replica(s) failed.", pytorchjob.Name, failed, rtype) - r.Recorder.Event(pytorchjob, corev1.EventTypeWarning, commonutil.JobRestartingReason, msg) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.JobRestartingReason, msg) + r.Recorder.Event(pytorchjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRestartingReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRestartingReason), msg) if err != nil { commonutil.LoggerForJob(pytorchjob).Infof("Append job condition error: %v", err) return err @@ -461,12 +462,12 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, trainingoperatorcommon.RestartedJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName()) } else { msg := fmt.Sprintf("PyTorchJob %s is failed because %d %s replica(s) failed.", pytorchjob.Name, failed, rtype) - r.Recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.JobFailedReason, msg) + r.Recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobFailedReason), msg) if jobStatus.CompletionTime == nil { now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.JobFailedReason, msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobFailedReason), msg) if err != nil { commonutil.LoggerForJob(pytorchjob).Infof("Append job condition error: %v", err) return err @@ -552,7 +553,7 @@ func (r *PyTorchJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool msg := fmt.Sprintf("PyTorchJob %s is created.", e.Object.GetName()) logrus.Info(msg) trainingoperatorcommon.CreatedJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName()) - if err := commonutil.UpdateJobConditions(&pytorchjob.Status, kubeflowv1.JobCreated, "PyTorchJobCreated", msg); err != nil { + if err := commonutil.UpdateJobConditions(&pytorchjob.Status, kubeflowv1.JobCreated, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobCreatedReason), msg); err != nil { logrus.Error(err, "append job condition error") return false } diff --git a/pkg/controller.v1/tensorflow/job_test.go b/pkg/controller.v1/tensorflow/job_test.go index 99e8adc16d..2cb44a4abd 100644 --- a/pkg/controller.v1/tensorflow/job_test.go +++ b/pkg/controller.v1/tensorflow/job_test.go @@ -244,7 +244,7 @@ var _ = Describe("TFJob controller", func() { ctx := context.Background() tc.tfJob.SetName(fmt.Sprintf(jobNameTemplate, idx)) tc.tfJob.SetUID(uuid.NewUUID()) - Expect(commonutil.UpdateJobConditions(&tc.tfJob.Status, kubeflowv1.JobSucceeded, tfJobSucceededReason, "")).Should(Succeed()) + Expect(commonutil.UpdateJobConditions(&tc.tfJob.Status, kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobSucceededReason), "")).Should(Succeed()) refs := []metav1.OwnerReference{ *reconciler.GenOwnerReference(tc.tfJob), diff --git a/pkg/controller.v1/tensorflow/tfjob_controller.go b/pkg/controller.v1/tensorflow/tfjob_controller.go index f5dbbabd50..e0d602fd7f 100644 --- a/pkg/controller.v1/tensorflow/tfjob_controller.go +++ b/pkg/controller.v1/tensorflow/tfjob_controller.go @@ -55,13 +55,6 @@ import ( ) const ( - // tfJobSucceededReason is added in a tfjob when it is succeeded. - tfJobSucceededReason = "TFJobSucceeded" - // tfJobRunningReason is added in a tfjob when it is running. - tfJobRunningReason = "TFJobRunning" - // tfJobFailedReason is added in a tfjob when it is failed. - tfJobFailedReason = "TFJobFailed" - FailedDeleteJobReason = "FailedDeleteJob" SuccessfulDeleteJobReason = "SuccessfulDeleteJob" @@ -136,7 +129,8 @@ func (r *TFJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl if err = kubeflowv1.ValidateV1TFJob(tfjob); err != nil { logger.Error(err, "TFJob failed validation") - r.Recorder.Eventf(tfjob, corev1.EventTypeWarning, commonutil.JobFailedValidationReason, "TFJob failed validation because %s", err) + r.Recorder.Eventf(tfjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobFailedValidationReason), + "TFJob failed validation because %s", err) return ctrl.Result{}, err } @@ -461,7 +455,7 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow msg := fmt.Sprintf("TFJob %s/%s is running.", tfJob.Namespace, tfJob.Name) err := commonutil.UpdateJobConditions(jobStatus, - kubeflowv1.JobRunning, tfJobRunningReason, msg) + kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobRunningReason), msg) if err != nil { commonutil.LoggerForJob(tfJob).Infof( "Append tfjob condition error: %v", err) @@ -471,13 +465,13 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow if expected == 0 { msg := fmt.Sprintf("TFJob %s/%s successfully completed.", tfJob.Namespace, tfJob.Name) - r.recorder.Event(tfJob, corev1.EventTypeNormal, tfJobSucceededReason, msg) + r.recorder.Event(tfJob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobSucceededReason), msg) if jobStatus.CompletionTime == nil { now := metav1.Now() jobStatus.CompletionTime = &now } err := commonutil.UpdateJobConditions(jobStatus, - kubeflowv1.JobSucceeded, tfJobSucceededReason, msg) + kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobSucceededReason), msg) if err != nil { commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) return err @@ -493,13 +487,13 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow if expected == 0 || (worker0Completed && *tfJob.Spec.SuccessPolicy != kubeflowv1.SuccessPolicyAllWorkers) { msg := fmt.Sprintf("TFJob %s/%s successfully completed.", tfJob.Namespace, tfJob.Name) - r.recorder.Event(tfJob, corev1.EventTypeNormal, tfJobSucceededReason, msg) + r.recorder.Event(tfJob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobSucceededReason), msg) if jobStatus.CompletionTime == nil { now := metav1.Now() jobStatus.CompletionTime = &now } err := commonutil.UpdateJobConditions(jobStatus, - kubeflowv1.JobSucceeded, tfJobSucceededReason, msg) + kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobSucceededReason), msg) if err != nil { commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) return err @@ -509,7 +503,7 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow // Some workers are still running, leave a running condition. msg := fmt.Sprintf("TFJob %s/%s is running.", tfJob.Namespace, tfJob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, tfJobRunningReason, msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobRunningReason), msg) if err != nil { commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) return err @@ -539,13 +533,12 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow } msg := fmt.Sprintf("TFJob %s/%s has failed because %d %s replica(s) failed.", tfJob.Namespace, tfJob.Name, failed, rtype) - r.recorder.Event(tfJob, corev1.EventTypeNormal, tfJobFailedReason, msg) + r.recorder.Event(tfJob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobFailedReason), msg) if jobStatus.CompletionTime == nil { now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, - kubeflowv1.JobFailed, tfJobFailedReason, msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobFailedReason), msg) if err != nil { commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) return err @@ -705,7 +698,7 @@ func (r *TFJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { msg := fmt.Sprintf("TFJob %s is created.", e.Object.GetName()) logrus.Info(msg) trainingoperatorcommon.CreatedJobsCounterInc(tfJob.Namespace, r.GetFrameworkName()) - if err := commonutil.UpdateJobConditions(&tfJob.Status, kubeflowv1.JobCreated, "TFJobCreated", msg); err != nil { + if err := commonutil.UpdateJobConditions(&tfJob.Status, kubeflowv1.JobCreated, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobCreatedReason), msg); err != nil { log.Log.Error(err, "append job condition error") return false } diff --git a/pkg/controller.v1/tensorflow/tfjob_controller_test.go b/pkg/controller.v1/tensorflow/tfjob_controller_test.go index f7db36570e..4236ccb0c2 100644 --- a/pkg/controller.v1/tensorflow/tfjob_controller_test.go +++ b/pkg/controller.v1/tensorflow/tfjob_controller_test.go @@ -27,6 +27,7 @@ import ( kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" tftestutil "github.com/kubeflow/training-operator/pkg/controller.v1/tensorflow/testutil" + commonutil "github.com/kubeflow/training-operator/pkg/util" ) var _ = Describe("TFJob controller", func() { @@ -119,7 +120,7 @@ var _ = Describe("TFJob controller", func() { 0, 0, 0, 4, 0, 0, 2, 0, 0, - &tfJobRunning, tfJobRunningReason, + &tfJobRunning, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobRunningReason), true, }, "Distributed TFJob (4 workers, 2 PS) is created, 2 workers, 1 PS are pending": { @@ -141,7 +142,7 @@ var _ = Describe("TFJob controller", func() { 2, 0, 2, 1, 0, 0, 0, 0, 0, - &tfJobRunning, tfJobRunningReason, + &tfJobRunning, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobRunningReason), false, }, "Distributed TFJob (4 workers, 2 PS) is created, 2 workers, 1 PS are pending, 1 worker is succeeded": { @@ -163,7 +164,7 @@ var _ = Describe("TFJob controller", func() { 0, 0, 0, 0, 4, 0, 0, 2, 0, - &tfJobSucceeded, tfJobSucceededReason, + &tfJobSucceeded, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobSucceededReason), false, }, } diff --git a/pkg/controller.v1/xgboost/status.go b/pkg/controller.v1/xgboost/status.go index 534c7533d1..4cae967e81 100644 --- a/pkg/controller.v1/xgboost/status.go +++ b/pkg/controller.v1/xgboost/status.go @@ -12,7 +12,7 @@ import ( func setRunningCondition(logger *logrus.Entry, jobName string, jobStatus *kubeflowv1.JobStatus) error { msg := fmt.Sprintf("XGBoostJob %s is running.", jobName) if condition := findStatusCondition(jobStatus.Conditions, kubeflowv1.JobRunning); condition == nil { - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, xgboostJobRunningReason, msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRunningReason), msg) if err != nil { logger.Infof("Append job condition error: %v", err) return err diff --git a/pkg/controller.v1/xgboost/status_test.go b/pkg/controller.v1/xgboost/status_test.go index c91e99c1e7..0649f6b785 100644 --- a/pkg/controller.v1/xgboost/status_test.go +++ b/pkg/controller.v1/xgboost/status_test.go @@ -9,6 +9,7 @@ import ( corev1 "k8s.io/api/core/v1" kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + commonutil "github.com/kubeflow/training-operator/pkg/util" ) var ignoreJobConditionsTimeOpts = cmpopts.IgnoreFields(kubeflowv1.JobCondition{}, "LastUpdateTime", "LastTransitionTime") @@ -24,7 +25,7 @@ func TestSetRunningCondition(t *testing.T) { input: []kubeflowv1.JobCondition{ { Type: kubeflowv1.JobSucceeded, - Reason: "XGBoostJobSucceeded", + Reason: commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobSucceededReason), Message: "XGBoostJob test-xbgoostjob is successfully completed.", Status: corev1.ConditionTrue, }, @@ -32,13 +33,13 @@ func TestSetRunningCondition(t *testing.T) { want: []kubeflowv1.JobCondition{ { Type: kubeflowv1.JobSucceeded, - Reason: "XGBoostJobSucceeded", + Reason: commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobSucceededReason), Message: "XGBoostJob test-xbgoostjob is successfully completed.", Status: corev1.ConditionTrue, }, { Type: kubeflowv1.JobRunning, - Reason: "XGBoostJobRunning", + Reason: commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRunningReason), Message: "XGBoostJob test-xbgoostjob is running.", Status: corev1.ConditionTrue, }, @@ -48,13 +49,13 @@ func TestSetRunningCondition(t *testing.T) { input: []kubeflowv1.JobCondition{ { Type: kubeflowv1.JobFailed, - Reason: "XGBoostJobFailed", + Reason: commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobFailedReason), Message: "XGBoostJob test-sgboostjob is failed because 2 Worker replica(s) failed.", Status: corev1.ConditionTrue, }, { Type: kubeflowv1.JobRunning, - Reason: "XGBoostJobRunning", + Reason: commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRunningReason), Message: "XGBoostJob test-xbgoostjob is running.", Status: corev1.ConditionTrue, }, @@ -62,13 +63,13 @@ func TestSetRunningCondition(t *testing.T) { want: []kubeflowv1.JobCondition{ { Type: kubeflowv1.JobFailed, - Reason: "XGBoostJobFailed", + Reason: commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobFailedReason), Message: "XGBoostJob test-sgboostjob is failed because 2 Worker replica(s) failed.", Status: corev1.ConditionTrue, }, { Type: kubeflowv1.JobRunning, - Reason: "XGBoostJobRunning", + Reason: commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRunningReason), Message: "XGBoostJob test-xbgoostjob is running.", Status: corev1.ConditionTrue, }, diff --git a/pkg/controller.v1/xgboost/xgboostjob_controller.go b/pkg/controller.v1/xgboost/xgboostjob_controller.go index 1fff2d3761..1949346245 100644 --- a/pkg/controller.v1/xgboost/xgboostjob_controller.go +++ b/pkg/controller.v1/xgboost/xgboostjob_controller.go @@ -61,16 +61,6 @@ const ( // Reasons for job events. FailedDeleteJobReason = "FailedDeleteJob" SuccessfulDeleteJobReason = "SuccessfulDeleteJob" - // xgboostJobCreatedReason is added in a job when it is created. - xgboostJobCreatedReason = "XGBoostJobCreated" - // xgboostJobSucceededReason is added in a job when it is succeeded. - xgboostJobSucceededReason = "XGBoostJobSucceeded" - // xgboostJobRunningReason is added in a job when it is running. - xgboostJobRunningReason = "XGBoostJobRunning" - // xgboostJobFailedReason is added in a job when it is failed. - xgboostJobFailedReason = "XGBoostJobFailed" - // xgboostJobRestartingReason is added in a job when it is restarting. - xgboostJobRestartingReason = "XGBoostJobRestarting" ) // NewReconciler creates a XGBoostJob Reconciler @@ -143,7 +133,8 @@ func (r *XGBoostJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) if err = kubeflowv1.ValidateV1XGBoostJob(xgboostjob); err != nil { logger.Error(err, "XGBoostJob failed validation") - r.Recorder.Eventf(xgboostjob, corev1.EventTypeWarning, commonutil.JobFailedValidationReason, "XGBoostJob failed validation because %s", err) + r.Recorder.Eventf(xgboostjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobFailedValidationReason), + "XGBoostJob failed validation because %s", err) return ctrl.Result{}, err } @@ -397,12 +388,12 @@ func (r *XGBoostJobReconciler) UpdateJobStatus(job interface{}, replicas map[kub } msg := fmt.Sprintf("XGBoostJob %s is successfully completed.", xgboostJob.Name) logrus.Info(msg) - r.Recorder.Event(xgboostJob, corev1.EventTypeNormal, xgboostJobSucceededReason, msg) + r.Recorder.Event(xgboostJob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobSucceededReason), msg) if jobStatus.CompletionTime == nil { now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, xgboostJobSucceededReason, msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobSucceededReason), msg) if err != nil { logger.Infof("Append job condition error: %v", err) return err @@ -417,8 +408,8 @@ func (r *XGBoostJobReconciler) UpdateJobStatus(job interface{}, replicas map[kub } if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode { msg := fmt.Sprintf("XGBoostJob %s is restarting because %d %s replica(s) failed.", xgboostJob.Name, failed, rtype) - r.Recorder.Event(xgboostJob, corev1.EventTypeWarning, xgboostJobRestartingReason, msg) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, xgboostJobRestartingReason, msg) + r.Recorder.Event(xgboostJob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRestartingReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRestartingReason), msg) if err != nil { logger.Infof("Append job condition error: %v", err) return err @@ -426,12 +417,12 @@ func (r *XGBoostJobReconciler) UpdateJobStatus(job interface{}, replicas map[kub trainingoperatorcommon.RestartedJobsCounterInc(xgboostJob.Namespace, r.GetFrameworkName()) } else { msg := fmt.Sprintf("XGBoostJob %s is failed because %d %s replica(s) failed.", xgboostJob.Name, failed, rtype) - r.Recorder.Event(xgboostJob, corev1.EventTypeNormal, xgboostJobFailedReason, msg) + r.Recorder.Event(xgboostJob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobFailedReason), msg) if jobStatus.CompletionTime == nil { now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, xgboostJobFailedReason, msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobFailedReason), msg) if err != nil { logger.Infof("Append job condition error: %v", err) return err @@ -499,7 +490,7 @@ func (r *XGBoostJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool msg := fmt.Sprintf("xgboostJob %s is created.", e.Object.GetName()) logrus.Info(msg) trainingoperatorcommon.CreatedJobsCounterInc(xgboostJob.Namespace, r.GetFrameworkName()) - if err := commonutil.UpdateJobConditions(&xgboostJob.Status, kubeflowv1.JobCreated, xgboostJobCreatedReason, msg); err != nil { + if err := commonutil.UpdateJobConditions(&xgboostJob.Status, kubeflowv1.JobCreated, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobCreatedReason), msg); err != nil { log.Log.Error(err, "append job condition error") return false } diff --git a/pkg/reconciler.v1/common/job.go b/pkg/reconciler.v1/common/job.go index d8c52f57a9..07ba630b8c 100644 --- a/pkg/reconciler.v1/common/job.go +++ b/pkg/reconciler.v1/common/job.go @@ -121,6 +121,7 @@ func (r *JobReconciler) ReconcileJob( logger := r.GetLogger(job) logger.Info(MsgReconcileStart) + jobKind := job.GetObjectKind().GroupVersionKind().Kind oldStatus := status.DeepCopy() @@ -211,9 +212,9 @@ func (r *JobReconciler) ReconcileJob( r.SetStatusForSuccessJob(status) } - r.GetRecorder().Event(job, corev1.EventTypeNormal, commonutil.JobFailedReason, failureMessage) + r.GetRecorder().Event(job, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage) - if err = commonutil.UpdateJobConditions(status, kubeflowv1.JobFailed, commonutil.JobFailedReason, failureMessage); err != nil { + if err = commonutil.UpdateJobConditions(status, kubeflowv1.JobFailed, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage); err != nil { logrus.Infof(ErrAppendJobConditionTemplate, err) return err } @@ -306,7 +307,7 @@ func (r *JobReconciler) UpdateJobStatus( if r.IsFlagReplicaTypeForJobStatus(string(rtype)) { if running > 0 { msg := fmt.Sprintf("%s %s is running.", jobKind, jobNamespacedName) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.JobRunningReason, msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(jobKind, commonutil.JobRunningReason), msg) if err != nil { logger.Info(ErrAppendJobConditionTemplate, err) return err @@ -316,12 +317,12 @@ func (r *JobReconciler) UpdateJobStatus( if expected == 0 { msg := fmt.Sprintf("%s %s is successfully completed.", jobKind, jobNamespacedName) logrus.Info(msg) - r.GetRecorder().Event(job, corev1.EventTypeNormal, commonutil.JobSucceededReason, msg) + r.GetRecorder().Event(job, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobSucceededReason), msg) if jobStatus.CompletionTime == nil { now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.JobSucceededReason, msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.NewReason(jobKind, commonutil.JobSucceededReason), msg) if err != nil { logger.Info(ErrAppendJobConditionTemplate, err) } @@ -333,8 +334,8 @@ func (r *JobReconciler) UpdateJobStatus( if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode { msg := fmt.Sprintf("%s %s is restarting because %d %s replica(s) failed.", jobKind, jobNamespacedName, failed, rtype) - r.GetRecorder().Event(job, corev1.EventTypeWarning, commonutil.JobRestartingReason, msg) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.JobRestartingReason, msg) + r.GetRecorder().Event(job, corev1.EventTypeWarning, commonutil.NewReason(jobKind, commonutil.JobRestartingReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.NewReason(jobKind, commonutil.JobRestartingReason), msg) if err != nil { logger.Info(ErrAppendJobConditionTemplate, err) return err @@ -346,7 +347,7 @@ func (r *JobReconciler) UpdateJobStatus( now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.JobFailedReason, msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.NewReason(jobKind, commonutil.JobFailedReason), msg) if err != nil { logger.Info(ErrAppendJobConditionTemplate, err) return err @@ -359,7 +360,7 @@ func (r *JobReconciler) UpdateJobStatus( msg := fmt.Sprintf("%s %s is running.", jobKind, jobNamespacedName) logger.Info(msg) - if err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.JobRunningReason, msg); err != nil { + if err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(jobKind, commonutil.JobRunningReason), msg); err != nil { logger.Error(err, ErrUpdateJobConditionsFailed, jobKind) return err } diff --git a/pkg/util/status.go b/pkg/util/status.go index 11439ce85a..e268a33a8f 100644 --- a/pkg/util/status.go +++ b/pkg/util/status.go @@ -1,29 +1,33 @@ package util import ( - apiv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + "fmt" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + apiv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" ) const ( // JobCreatedReason is added in a job when it is created. - JobCreatedReason = "JobCreated" + JobCreatedReason = "Created" // JobSucceededReason is added in a job when it is succeeded. - JobSucceededReason = "JobSucceeded" + JobSucceededReason = "Succeeded" // JobRunningReason is added in a job when it is running. - JobRunningReason = "JobRunning" + JobRunningReason = "Running" // JobFailedReason is added in a job when it is failed. - JobFailedReason = "JobFailed" + JobFailedReason = "Failed" // JobRestartingReason is added in a job when it is restarting. - JobRestartingReason = "JobRestarting" + JobRestartingReason = "Restarting" // JobFailedValidationReason is added in a job when it failed validation - JobFailedValidationReason = "JobFailedValidation" - - // labels for pods and servers. - + JobFailedValidationReason = "FailedValidation" ) +func NewReason(kind, reason string) string { + return fmt.Sprintf("%s%s", kind, reason) +} + // IsSucceeded checks if the job is succeeded func IsSucceeded(status apiv1.JobStatus) bool { return hasCondition(status, apiv1.JobSucceeded)