diff --git a/pkg/common/interface.go b/pkg/common/interface.go index 6e9e5f8dd8..e9ea457ecc 100644 --- a/pkg/common/interface.go +++ b/pkg/common/interface.go @@ -88,4 +88,7 @@ type ControllerInterface interface { // It will requeue the job in case of an error while creating/deleting services. // Common implementation will be provided and User can still override this to implement their own reconcile logic ReconcileServices(job metav1.Object, services []*v1.Service, rtype apiv1.ReplicaType, spec *apiv1.ReplicaSpec) error + + // GetFrameworkName returns framework name (e.g., tensorflow). + GetFrameworkName() string } diff --git a/pkg/controller.v1/common/pod.go b/pkg/controller.v1/common/pod.go index ccb773ec08..0d975c52c4 100644 --- a/pkg/controller.v1/common/pod.go +++ b/pkg/controller.v1/common/pod.go @@ -21,6 +21,7 @@ import ( "strings" apiv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + trainingoperatorcommon "github.com/kubeflow/training-operator/pkg/common" "github.com/kubeflow/training-operator/pkg/controller.v1/control" "github.com/kubeflow/training-operator/pkg/controller.v1/expectation" "github.com/kubeflow/training-operator/pkg/core" @@ -356,6 +357,14 @@ func (jc *JobController) ReconcilePods( // Deletion is expected jc.Expectations.RaiseExpectations(expectationPodsKey, 0, 1) + msg := fmt.Sprintf("job %s is restarting because %s replica(s) failed.", + metaObject.GetName(), rType) + jc.Recorder.Event(runtimeObject, v1.EventTypeWarning, "JobRestarting", msg) + if err := commonutil.UpdateJobConditions(jobStatus, apiv1.JobRestarting, "JobRestarting", msg); err != nil { + commonutil.LoggerForJob(metaObject).Infof("Append job condition error: %v", err) + return err + } + trainingoperatorcommon.RestartedJobsCounterInc(metaObject.GetNamespace(), jc.Controller.GetFrameworkName()) } updateJobReplicaStatuses(jobStatus, rType, pod) diff --git a/pkg/controller.v1/mpi/mpijob_controller.go b/pkg/controller.v1/mpi/mpijob_controller.go index e3a349a3f3..e88479f874 100644 --- a/pkg/controller.v1/mpi/mpijob_controller.go +++ b/pkg/controller.v1/mpi/mpijob_controller.go @@ -277,6 +277,10 @@ func (jc *MPIJobReconciler) GetGroupNameLabelValue() string { return kubeflowv1.GroupVersion.Group } +func (jc *MPIJobReconciler) GetFrameworkName() string { + return kubeflowv1.MPIJobFrameworkName +} + // SetClusterSpec is overridden because no cluster spec is needed for MPIJob func (jc *MPIJobReconciler) SetClusterSpec(job interface{}, podTemplate *corev1.PodTemplateSpec, rtype, index string) error { return nil @@ -314,7 +318,7 @@ func (jc *MPIJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { jc.Scheme.Default(mpiJob) msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, e.Object.GetName()) logrus.Info(msg) - trainingoperatorcommon.CreatedJobsCounterInc(mpiJob.Namespace, kubeflowv1.MPIJobFrameworkName) + trainingoperatorcommon.CreatedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName()) if err := commonutil.UpdateJobConditions(&mpiJob.Status, kubeflowv1.JobCreated, mpiJobCreatedReason, msg); err != nil { log.Log.Error(err, "append job condition error") return false @@ -546,7 +550,7 @@ func (jc *MPIJobReconciler) DeleteJob(job interface{}) error { jc.Recorder.Eventf(mpiJob, corev1.EventTypeNormal, SuccessfulDeleteJobReason, "Deleted job: %v", mpiJob.Name) log.Infof("job %s/%s has been deleted", mpiJob.Namespace, mpiJob.Name) - trainingoperatorcommon.DeletedJobsCounterInc(mpiJob.Namespace, kubeflowv1.MPIJobFrameworkName) + trainingoperatorcommon.DeletedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName()) return nil } @@ -597,7 +601,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err) return err } - trainingoperatorcommon.SuccessfulJobsCounterInc(mpiJob.Namespace, kubeflowv1.MPIJobFrameworkName) + trainingoperatorcommon.SuccessfulJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName()) return nil } } @@ -610,7 +614,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err) return err } - trainingoperatorcommon.RestartedJobsCounterInc(mpiJob.Namespace, kubeflowv1.MPIJobFrameworkName) + trainingoperatorcommon.RestartedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName()) } else { msg := fmt.Sprintf("MPIJob %s is failed because %d %s replica(s) failed.", mpiJob.Name, failed, rtype) jc.Recorder.Event(mpiJob, corev1.EventTypeNormal, commonutil.JobFailedReason, msg) @@ -623,7 +627,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err) return err } - trainingoperatorcommon.FailedJobsCounterInc(mpiJob.Namespace, kubeflowv1.MPIJobFrameworkName) + trainingoperatorcommon.FailedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName()) } } } diff --git a/pkg/controller.v1/mxnet/mxjob_controller.go b/pkg/controller.v1/mxnet/mxjob_controller.go index 82920d002d..6836101d29 100644 --- a/pkg/controller.v1/mxnet/mxjob_controller.go +++ b/pkg/controller.v1/mxnet/mxjob_controller.go @@ -257,6 +257,10 @@ func (r *MXJobReconciler) GetGroupNameLabelValue() string { return kubeflowv1.GroupVersion.Group } +func (r *MXJobReconciler) GetFrameworkName() string { + return kubeflowv1.MXJobFrameworkName +} + func (r *MXJobReconciler) GetJobFromInformerCache(namespace, name string) (metav1.Object, error) { job := &kubeflowv1.MXJob{} err := r.Get(context.Background(), types.NamespacedName{Namespace: namespace, Name: name}, job) @@ -331,7 +335,7 @@ func (r *MXJobReconciler) DeleteJob(job interface{}) error { } r.Recorder.Eventf(mxjob, corev1.EventTypeNormal, control.SuccessfulDeletePodReason, "Deleted job: %v", mxjob.Name) logrus.Info("job deleted", "namespace", mxjob.Namespace, "name", mxjob.Name) - trainingoperatorcommon.DeletedJobsCounterInc(mxjob.Namespace, kubeflowv1.MXJobFrameworkName) + trainingoperatorcommon.DeletedJobsCounterInc(mxjob.Namespace, r.GetFrameworkName()) return nil } @@ -394,7 +398,7 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow logrus.Infof("Append mxjob condition error: %v", err) return err } - trainingoperatorcommon.SuccessfulJobsCounterInc(mxjob.Namespace, kubeflowv1.MXJobFrameworkName) + trainingoperatorcommon.SuccessfulJobsCounterInc(mxjob.Namespace, r.GetFrameworkName()) return nil } } @@ -407,7 +411,7 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow logrus.Infof("Append job condition error: %v", err) return err } - trainingoperatorcommon.RestartedJobsCounterInc(mxjob.Namespace, kubeflowv1.MXJobFrameworkName) + trainingoperatorcommon.RestartedJobsCounterInc(mxjob.Namespace, r.GetFrameworkName()) } else { msg := fmt.Sprintf("mxjob %s is failed because %d %s replica(s) failed.", mxjob.Name, failed, rtype) r.Recorder.Event(mxjob, corev1.EventTypeNormal, mxJobFailedReason, msg) @@ -420,7 +424,7 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow logrus.Infof("Append job condition error: %v", err) return err } - trainingoperatorcommon.FailedJobsCounterInc(mxjob.Namespace, kubeflowv1.MXJobFrameworkName) + trainingoperatorcommon.FailedJobsCounterInc(mxjob.Namespace, r.GetFrameworkName()) } } } @@ -481,7 +485,7 @@ func (r *MXJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { r.Scheme.Default(mxJob) msg := fmt.Sprintf("MXJob %s is created.", e.Object.GetName()) logrus.Info(msg) - trainingoperatorcommon.CreatedJobsCounterInc(mxJob.Namespace, kubeflowv1.MXJobFrameworkName) + trainingoperatorcommon.CreatedJobsCounterInc(mxJob.Namespace, r.GetFrameworkName()) if err := commonutil.UpdateJobConditions(&mxJob.Status, kubeflowv1.JobCreated, "MXJobCreated", msg); err != nil { logrus.Error(err, "append job condition error") return false diff --git a/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go b/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go index c26f8a71c8..69e35f1ed9 100644 --- a/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go +++ b/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go @@ -252,6 +252,10 @@ func (r *PaddleJobReconciler) GetGroupNameLabelValue() string { return kubeflowv1.GroupVersion.Group } +func (r *PaddleJobReconciler) GetFrameworkName() string { + return kubeflowv1.PaddleJobFrameworkName +} + func (r *PaddleJobReconciler) GetJobFromInformerCache(namespace, name string) (metav1.Object, error) { job := &kubeflowv1.PaddleJob{} err := r.Get(context.Background(), types.NamespacedName{Namespace: namespace, Name: name}, job) @@ -328,7 +332,7 @@ func (r *PaddleJobReconciler) DeleteJob(job interface{}) error { } r.recorder.Eventf(paddlejob, corev1.EventTypeNormal, control.SuccessfulDeletePodReason, "Deleted job: %v", paddlejob.Name) logrus.Info("job deleted", "namespace", paddlejob.Namespace, "name", paddlejob.Name) - trainingoperatorcommon.DeletedJobsCounterInc(paddlejob.Namespace, kubeflowv1.PaddleJobFrameworkName) + trainingoperatorcommon.DeletedJobsCounterInc(paddlejob.Namespace, r.GetFrameworkName()) return nil } @@ -408,7 +412,7 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, commonutil.LoggerForJob(paddlejob).Infof("Append job condition error: %v", err) return err } - trainingoperatorcommon.SuccessfulJobsCounterInc(paddlejob.Namespace, kubeflowv1.PaddleJobFrameworkName) + trainingoperatorcommon.SuccessfulJobsCounterInc(paddlejob.Namespace, r.GetFrameworkName()) return nil } } @@ -429,7 +433,7 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, commonutil.LoggerForJob(paddlejob).Infof("Append paddlejob condition error: %v", err) return err } - trainingoperatorcommon.SuccessfulJobsCounterInc(paddlejob.Namespace, kubeflowv1.PaddleJobFrameworkName) + trainingoperatorcommon.SuccessfulJobsCounterInc(paddlejob.Namespace, r.GetFrameworkName()) } else if running > 0 { // Some workers are still running, leave a running condition. msg := fmt.Sprintf("PaddleJob %s/%s is running.", @@ -452,7 +456,7 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, commonutil.LoggerForJob(paddlejob).Infof("Append job condition error: %v", err) return err } - trainingoperatorcommon.RestartedJobsCounterInc(paddlejob.Namespace, kubeflowv1.PaddleJobFrameworkName) + trainingoperatorcommon.RestartedJobsCounterInc(paddlejob.Namespace, r.GetFrameworkName()) } else { msg := fmt.Sprintf("PaddleJob %s is failed because %d %s replica(s) failed.", paddlejob.Name, failed, rtype) r.Recorder.Event(paddlejob, corev1.EventTypeNormal, commonutil.JobFailedReason, msg) @@ -465,7 +469,7 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, commonutil.LoggerForJob(paddlejob).Infof("Append job condition error: %v", err) return err } - trainingoperatorcommon.FailedJobsCounterInc(paddlejob.Namespace, kubeflowv1.PaddleJobFrameworkName) + trainingoperatorcommon.FailedJobsCounterInc(paddlejob.Namespace, r.GetFrameworkName()) } } } @@ -544,7 +548,7 @@ func (r *PaddleJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { r.Scheme.Default(paddlejob) msg := fmt.Sprintf("PaddleJob %s is created.", e.Object.GetName()) logrus.Info(msg) - trainingoperatorcommon.CreatedJobsCounterInc(paddlejob.Namespace, kubeflowv1.PaddleJobFrameworkName) + trainingoperatorcommon.CreatedJobsCounterInc(paddlejob.Namespace, r.GetFrameworkName()) if err := commonutil.UpdateJobConditions(&paddlejob.Status, kubeflowv1.JobCreated, "PaddleJobCreated", msg); err != nil { logrus.Error(err, "append job condition error") return false diff --git a/pkg/controller.v1/pytorch/pytorchjob_controller.go b/pkg/controller.v1/pytorch/pytorchjob_controller.go index 8cb378fb15..7ee3b6f3b9 100644 --- a/pkg/controller.v1/pytorch/pytorchjob_controller.go +++ b/pkg/controller.v1/pytorch/pytorchjob_controller.go @@ -252,6 +252,10 @@ func (r *PyTorchJobReconciler) GetGroupNameLabelValue() string { return kubeflowv1.GroupVersion.Group } +func (r *PyTorchJobReconciler) GetFrameworkName() string { + return kubeflowv1.PytorchJobFrameworkName +} + func (r *PyTorchJobReconciler) GetJobFromInformerCache(namespace, name string) (metav1.Object, error) { job := &kubeflowv1.PyTorchJob{} err := r.Get(context.Background(), types.NamespacedName{Namespace: namespace, Name: name}, job) @@ -328,7 +332,7 @@ func (r *PyTorchJobReconciler) DeleteJob(job interface{}) error { } r.recorder.Eventf(pytorchjob, corev1.EventTypeNormal, control.SuccessfulDeletePodReason, "Deleted job: %v", pytorchjob.Name) logrus.Info("job deleted", "namespace", pytorchjob.Namespace, "name", pytorchjob.Name) - trainingoperatorcommon.DeletedJobsCounterInc(pytorchjob.Namespace, kubeflowv1.PytorchJobFrameworkName) + trainingoperatorcommon.DeletedJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName()) return nil } @@ -407,7 +411,7 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, commonutil.LoggerForJob(pytorchjob).Infof("Append job condition error: %v", err) return err } - trainingoperatorcommon.SuccessfulJobsCounterInc(pytorchjob.Namespace, kubeflowv1.PytorchJobFrameworkName) + trainingoperatorcommon.SuccessfulJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName()) return nil } } @@ -431,7 +435,7 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, commonutil.LoggerForJob(pytorchjob).Infof("Append pytorchjob condition error: %v", err) return err } - trainingoperatorcommon.SuccessfulJobsCounterInc(pytorchjob.Namespace, kubeflowv1.PytorchJobFrameworkName) + trainingoperatorcommon.SuccessfulJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName()) } else if running > 0 { // Some workers are still running, leave a running condition. msg := fmt.Sprintf("PyTorchJob %s/%s is running.", @@ -454,7 +458,7 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, commonutil.LoggerForJob(pytorchjob).Infof("Append job condition error: %v", err) return err } - trainingoperatorcommon.RestartedJobsCounterInc(pytorchjob.Namespace, kubeflowv1.PytorchJobFrameworkName) + trainingoperatorcommon.RestartedJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName()) } else { msg := fmt.Sprintf("PyTorchJob %s is failed because %d %s replica(s) failed.", pytorchjob.Name, failed, rtype) r.Recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.JobFailedReason, msg) @@ -467,7 +471,7 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, commonutil.LoggerForJob(pytorchjob).Infof("Append job condition error: %v", err) return err } - trainingoperatorcommon.FailedJobsCounterInc(pytorchjob.Namespace, kubeflowv1.PytorchJobFrameworkName) + trainingoperatorcommon.FailedJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName()) } } } @@ -547,7 +551,7 @@ func (r *PyTorchJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool r.Scheme.Default(pytorchjob) msg := fmt.Sprintf("PyTorchJob %s is created.", e.Object.GetName()) logrus.Info(msg) - trainingoperatorcommon.CreatedJobsCounterInc(pytorchjob.Namespace, kubeflowv1.PytorchJobFrameworkName) + trainingoperatorcommon.CreatedJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName()) if err := commonutil.UpdateJobConditions(&pytorchjob.Status, kubeflowv1.JobCreated, "PyTorchJobCreated", msg); err != nil { logrus.Error(err, "append job condition error") return false diff --git a/pkg/controller.v1/tensorflow/tfjob_controller.go b/pkg/controller.v1/tensorflow/tfjob_controller.go index 904347359b..f5dbbabd50 100644 --- a/pkg/controller.v1/tensorflow/tfjob_controller.go +++ b/pkg/controller.v1/tensorflow/tfjob_controller.go @@ -17,7 +17,6 @@ package tensorflow import ( "context" "fmt" - "strconv" "strings" "time" @@ -28,7 +27,6 @@ import ( "github.com/kubeflow/training-operator/pkg/controller.v1/control" "github.com/kubeflow/training-operator/pkg/controller.v1/expectation" commonutil "github.com/kubeflow/training-operator/pkg/util" - trainutil "github.com/kubeflow/training-operator/pkg/util/train" "github.com/go-logr/logr" "github.com/sirupsen/logrus" @@ -63,8 +61,6 @@ const ( tfJobRunningReason = "TFJobRunning" // tfJobFailedReason is added in a tfjob when it is failed. tfJobFailedReason = "TFJobFailed" - // tfJobRestarting is added in a tfjob when it is restarting. - tfJobRestartingReason = "TFJobRestarting" FailedDeleteJobReason = "FailedDeleteJob" SuccessfulDeleteJobReason = "SuccessfulDeleteJob" @@ -73,14 +69,6 @@ const ( // tfConfig is the environment variable name of TensorFlow cluster spec. tfConfig = "TF_CONFIG" - // exitedWithCodeReason is the normal reason when the pod is exited because of the exit code. - exitedWithCodeReason = "ExitedWithCode" - // podTemplateRestartPolicyReason is the warning reason when the restart - // policy is set in pod template. - podTemplateRestartPolicyReason = "SettedPodTemplateRestartPolicy" - // podTemplateSchedulerNameReason is the warning reason when other scheduler name is set - // in pod templates with gang-scheduling enabled - podTemplateSchedulerNameReason = "SettedPodTemplateSchedulerName" ) func NewReconciler(mgr manager.Manager, gangSchedulingSetupFunc common.GangSchedulingSetupFunc) *TFJobReconciler { @@ -262,6 +250,10 @@ func (r *TFJobReconciler) GetGroupNameLabelValue() string { return kubeflowv1.GroupVersion.Group } +func (r *TFJobReconciler) GetFrameworkName() string { + return kubeflowv1.TFJobFrameworkName +} + func (r *TFJobReconciler) GetJobFromInformerCache(namespace, name string) (metav1.Object, error) { tfjob := &kubeflowv1.TFJob{} err := r.Get(context.Background(), types.NamespacedName{ @@ -388,7 +380,7 @@ func (r *TFJobReconciler) DeleteJob(job interface{}) error { r.recorder.Eventf(tfJob, v1.EventTypeNormal, SuccessfulDeleteJobReason, "Deleted job: %v", tfJob.Name) log.Infof("job %s/%s has been deleted", tfJob.Namespace, tfJob.Name) - trainingoperatorcommon.DeletedJobsCounterInc(tfJob.Namespace, kubeflowv1.TFJobFrameworkName) + trainingoperatorcommon.DeletedJobsCounterInc(tfJob.Namespace, r.GetFrameworkName()) return nil } @@ -490,7 +482,7 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) return err } - trainingoperatorcommon.SuccessfulJobsCounterInc(tfJob.Namespace, kubeflowv1.TFJobFrameworkName) + trainingoperatorcommon.SuccessfulJobsCounterInc(tfJob.Namespace, r.GetFrameworkName()) } } } else { @@ -512,7 +504,7 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) return err } - trainingoperatorcommon.SuccessfulJobsCounterInc(tfJob.Namespace, kubeflowv1.TFJobFrameworkName) + trainingoperatorcommon.SuccessfulJobsCounterInc(tfJob.Namespace, r.GetFrameworkName()) } else if running > 0 { // Some workers are still running, leave a running condition. msg := fmt.Sprintf("TFJob %s/%s is running.", @@ -538,7 +530,7 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow } // job is restarting, no need to set it failed // we know it because we update the status condition when reconciling the replicas - trainingoperatorcommon.RestartedJobsCounterInc(tfJob.Namespace, kubeflowv1.TFJobFrameworkName) + trainingoperatorcommon.RestartedJobsCounterInc(tfJob.Namespace, r.GetFrameworkName()) } else { if tfJob.Spec.EnableDynamicWorker && rtype == kubeflowv1.TFJobReplicaTypeWorker { commonutil.LoggerForJob(tfJob).Infof("TFJob %s/%s continues regardless %d Worker replica(s) failed as enableDynamicWorker is set true.", @@ -558,7 +550,7 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) return err } - trainingoperatorcommon.FailedJobsCounterInc(tfJob.Namespace, kubeflowv1.TFJobFrameworkName) + trainingoperatorcommon.FailedJobsCounterInc(tfJob.Namespace, r.GetFrameworkName()) } } } @@ -701,201 +693,6 @@ func (r *TFJobReconciler) getPodSlices(tfjob *kubeflowv1.TFJob, replicasNum *int return podSlices, nil } -// In order to minimize the changes, we copy TFController's logic here to override kubeflow/commons reconcile logic -// This should be removed later unless TF has specific logics there -// reconcilePods checks and updates pods for each given TFReplicaSpec. -// It will requeue the tfjob in case of an error while creating/deleting pods. -func (r *TFJobReconciler) ReconcilePods( - job interface{}, - jobStatus *kubeflowv1.JobStatus, - pods []*v1.Pod, - rtype kubeflowv1.ReplicaType, - spec *kubeflowv1.ReplicaSpec, - replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec, -) error { - - tfJob, ok := job.(*kubeflowv1.TFJob) - if !ok { - return fmt.Errorf("%v is not a type of TFJob", tfJob) - } - - // Convert ReplicaType to lower string. - rt := strings.ToLower(string(rtype)) - logger := commonutil.LoggerForJob(tfJob) - // Get all pods for the type rt. - pods, err := r.FilterPodsForReplicaType(pods, rt) - if err != nil { - return err - } - numReplicas := int(*spec.Replicas) - masterRole := false - //restart := false - //worker0Completed := false - - initializeReplicaStatuses(jobStatus, rtype) - - // GetPodSlices will return enough information here to make decision to add/remove/update resources. - // - // For example, let's assume we have pods with replica-index 0, 1, 2 - // If replica is 4, return a slice with size 4. [[0],[1],[2],[]], a pod with replica-index 3 will be created. - // - // If replica is 1, return a slice with size 3. [[0],[1],[2]], pod with replica-index 1 and 2 are out of range and will be deleted. - podSlices := r.GetPodSlices(pods, numReplicas, logger) - for index, podSlice := range podSlices { - if len(podSlice) > 1 { - logger.Warningf("We have too many pods for %s %d", rt, index) - } else if len(podSlice) == 0 { - logger.Infof("Need to create new pod: %s-%d", rt, index) - - // check if this replica is the master role - masterRole = r.IsMasterRole(replicas, rtype, index) - // TODO: [should change to CreateNewPod] - err = r.createNewPod(tfJob, rt, strconv.Itoa(index), spec, masterRole, replicas) - if err != nil { - return err - } - } else { - // Check the status of the current pod. - pod := podSlice[0] - - // check if the index is in the valid range, if not, we should kill the pod - if index < 0 || index >= numReplicas { - err = r.PodControl.DeletePod(pod.Namespace, pod.Name, tfJob) - if err != nil { - return err - } - } - // Get the exit code of the container. - var exitCode int32 = 0xbeef // magic number - for _, status := range pod.Status.ContainerStatuses { - state := status.State - if status.Name == r.GetDefaultContainerName() && state.Terminated != nil { - exitCode = state.Terminated.ExitCode - logger.Infof("Pod: %v.%v exited with code %v", pod.Namespace, pod.Name, exitCode) - r.Recorder.Eventf(tfJob, v1.EventTypeNormal, exitedWithCodeReason, "Pod: %v.%v exited with code %v", pod.Namespace, pod.Name, exitCode) - } - } - // Check if the pod is retryable. - if pod.Status.Phase == v1.PodFailed && - (spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode && trainutil.IsRetryableExitCode(exitCode) || - spec.RestartPolicy == kubeflowv1.RestartPolicyOnFailure || - spec.RestartPolicy == kubeflowv1.RestartPolicyAlways) { - logger.Infof("Need to restart the pod: %v.%v", pod.Namespace, pod.Name) - if err := r.PodControl.DeletePod(pod.Namespace, pod.Name, tfJob); err != nil { - return err - } - - // with common library framework, we have to handle restart status here - // or we won't know which replica has been restarted in updateJobStatus after reconciling all replicas - msg := fmt.Sprintf("TFJob %s is restarting because %s replica(s) failed.", - tfJob.Name, rtype) - r.Recorder.Event(tfJob, corev1.EventTypeWarning, tfJobRestartingReason, msg) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, tfJobRestartingReason, msg) - if err != nil { - commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) - return err - } - trainingoperatorcommon.RestartedJobsCounterInc(tfJob.Namespace, kubeflowv1.TFJobFrameworkName) - } - - updateJobReplicaStatuses(jobStatus, rtype, pod) - } - } - return nil -} - -// createNewPod creates a new pod for the given index and type. -func (r *TFJobReconciler) createNewPod(tfjob *kubeflowv1.TFJob, rt, index string, spec *kubeflowv1.ReplicaSpec, masterRole bool, - replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec) error { - - tfjobKey, err := common.KeyFunc(tfjob) - if err != nil { - utilruntime.HandleError(fmt.Errorf("couldn't get key for tfjob object %#v: %v", tfjob, err)) - return err - } - expectationPodsKey := expectation.GenExpectationPodsKey(tfjobKey, rt) - err = r.Expectations.ExpectCreations(expectationPodsKey, 1) - if err != nil { - return err - } - logger := commonutil.LoggerForReplica(tfjob, rt) - // Create OwnerReference. - controllerRef := r.GenOwnerReference(tfjob) - - // Set type and index for the worker. - labels := r.GenLabels(tfjob.Name) - labels[kubeflowv1.ReplicaTypeLabel] = rt - labels[kubeflowv1.ReplicaIndexLabel] = index - - if masterRole { - labels[kubeflowv1.JobRoleLabel] = "master" - } - - podTemplate := spec.Template.DeepCopy() - - // Set name for the template. - podTemplate.Name = common.GenGeneralName(tfjob.Name, rt, index) - - if podTemplate.Labels == nil { - podTemplate.Labels = make(map[string]string) - } - - for key, value := range labels { - podTemplate.Labels[key] = value - } - - if err := r.SetClusterSpec(tfjob, podTemplate, rt, index); err != nil { - return err - } - - // Submit a warning event if the user specifies restart policy for - // the pod template. We recommend to set it from the replica level. - if podTemplate.Spec.RestartPolicy != v1.RestartPolicy("") { - errMsg := "Restart policy in pod template will be overwritten by restart policy in replica spec" - logger.Warning(errMsg) - r.Recorder.Event(tfjob, v1.EventTypeWarning, podTemplateRestartPolicyReason, errMsg) - } - setRestartPolicy(podTemplate, spec) - - // if gang-scheduling is enabled: - // 1. if user has specified other scheduler, we report a warning without overriding any fields. - // 2. if no SchedulerName is set for pods, then we set the SchedulerName to "volcano". - if r.Config.EnableGangScheduling() { - podSchedulerName := util.GetSchedulerName(replicas) - gangSchedulerName := r.PodGroupControl.GetSchedulerName() - - if len(podSchedulerName) == 0 { - podTemplate.Spec.SchedulerName = gangSchedulerName - } else if strings.Compare(podSchedulerName, gangSchedulerName) != 0 { - errMsg := "Another scheduler is specified when gang-scheduling is enabled and it will not be overwritten" - logger.Warning(errMsg) - r.Recorder.Event(tfjob, v1.EventTypeWarning, podTemplateSchedulerNameReason, errMsg) - } - - r.PodGroupControl.DecoratePodTemplateSpec(podTemplate, tfjob, rt) - } - - err = r.PodControl.CreatePodsWithControllerRef(tfjob.Namespace, podTemplate, tfjob, controllerRef) - if err != nil && errors.IsTimeout(err) { - // Pod is created but its initialization has timed out. - // If the initialization is successful eventually, the - // controller will observe the creation via the informer. - // If the initialization fails, or if the pod keeps - // uninitialized for a long time, the informer will not - // receive any update, and the controller will create a new - // pod when the expectation expires. - return nil - } else if err != nil { - // Decrement the expected number of creates because the informer won't observe this pod - logger.Infof( - "Failed creation, decrementing expectations for tfjob %s/%s, key %s", - tfjob.Namespace, tfjob.Name, expectationPodsKey) - r.Expectations.CreationObserved(expectationPodsKey) - return err - } - return nil -} - // onOwnerCreateFunc modify creation condition. func (r *TFJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { return func(e event.CreateEvent) bool { @@ -907,7 +704,7 @@ func (r *TFJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { r.Scheme.Default(tfJob) msg := fmt.Sprintf("TFJob %s is created.", e.Object.GetName()) logrus.Info(msg) - trainingoperatorcommon.CreatedJobsCounterInc(tfJob.Namespace, kubeflowv1.TFJobFrameworkName) + trainingoperatorcommon.CreatedJobsCounterInc(tfJob.Namespace, r.GetFrameworkName()) if err := commonutil.UpdateJobConditions(&tfJob.Status, kubeflowv1.JobCreated, "TFJobCreated", msg); err != nil { log.Log.Error(err, "append job condition error") return false diff --git a/pkg/controller.v1/xgboost/xgboostjob_controller.go b/pkg/controller.v1/xgboost/xgboostjob_controller.go index 8003f4729e..1fff2d3761 100644 --- a/pkg/controller.v1/xgboost/xgboostjob_controller.go +++ b/pkg/controller.v1/xgboost/xgboostjob_controller.go @@ -257,6 +257,10 @@ func (r *XGBoostJobReconciler) GetGroupNameLabelValue() string { return kubeflowv1.GroupVersion.Group } +func (r *XGBoostJobReconciler) GetFrameworkName() string { + return kubeflowv1.XGBoostJobFrameworkName +} + // GetJobFromInformerCache returns the Job from Informer Cache func (r *XGBoostJobReconciler) GetJobFromInformerCache(namespace, name string) (metav1.Object, error) { job := &kubeflowv1.XGBoostJob{} @@ -339,7 +343,7 @@ func (r *XGBoostJobReconciler) DeleteJob(job interface{}) error { } r.recorder.Eventf(xgboostjob, corev1.EventTypeNormal, SuccessfulDeleteJobReason, "Deleted job: %v", xgboostjob.Name) r.Log.Info("job deleted", "namespace", xgboostjob.Namespace, "name", xgboostjob.Name) - trainingoperatorcommon.DeletedJobsCounterInc(xgboostjob.Namespace, kubeflowv1.XGBoostJobFrameworkName) + trainingoperatorcommon.DeletedJobsCounterInc(xgboostjob.Namespace, r.GetFrameworkName()) return nil } @@ -403,7 +407,7 @@ func (r *XGBoostJobReconciler) UpdateJobStatus(job interface{}, replicas map[kub logger.Infof("Append job condition error: %v", err) return err } - trainingoperatorcommon.SuccessfulJobsCounterInc(xgboostJob.Namespace, kubeflowv1.XGBoostJobFrameworkName) + trainingoperatorcommon.SuccessfulJobsCounterInc(xgboostJob.Namespace, r.GetFrameworkName()) return nil } } @@ -419,7 +423,7 @@ func (r *XGBoostJobReconciler) UpdateJobStatus(job interface{}, replicas map[kub logger.Infof("Append job condition error: %v", err) return err } - trainingoperatorcommon.RestartedJobsCounterInc(xgboostJob.Namespace, kubeflowv1.XGBoostJobFrameworkName) + trainingoperatorcommon.RestartedJobsCounterInc(xgboostJob.Namespace, r.GetFrameworkName()) } else { msg := fmt.Sprintf("XGBoostJob %s is failed because %d %s replica(s) failed.", xgboostJob.Name, failed, rtype) r.Recorder.Event(xgboostJob, corev1.EventTypeNormal, xgboostJobFailedReason, msg) @@ -432,7 +436,7 @@ func (r *XGBoostJobReconciler) UpdateJobStatus(job interface{}, replicas map[kub logger.Infof("Append job condition error: %v", err) return err } - trainingoperatorcommon.FailedJobsCounterInc(xgboostJob.Namespace, kubeflowv1.XGBoostJobFrameworkName) + trainingoperatorcommon.FailedJobsCounterInc(xgboostJob.Namespace, r.GetFrameworkName()) } } } @@ -494,7 +498,7 @@ func (r *XGBoostJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool r.Scheme.Default(xgboostJob) msg := fmt.Sprintf("xgboostJob %s is created.", e.Object.GetName()) logrus.Info(msg) - trainingoperatorcommon.CreatedJobsCounterInc(xgboostJob.Namespace, kubeflowv1.XGBoostJobFrameworkName) + trainingoperatorcommon.CreatedJobsCounterInc(xgboostJob.Namespace, r.GetFrameworkName()) if err := commonutil.UpdateJobConditions(&xgboostJob.Status, kubeflowv1.JobCreated, xgboostJobCreatedReason, msg); err != nil { log.Log.Error(err, "append job condition error") return false