Skip to content

Commit

Permalink
Implement suspend semantics to PyTorchJob
Browse files Browse the repository at this point in the history
Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
  • Loading branch information
tenzen-y committed Jul 11, 2023
1 parent 938a343 commit 383e6b7
Show file tree
Hide file tree
Showing 24 changed files with 695 additions and 139 deletions.
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_mpijobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7702,6 +7702,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_mxjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7701,6 +7701,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_paddlejobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8212,6 +8212,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_pytorchjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8241,6 +8241,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_tfjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_xgboostjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: suspend specifies whether the Job controller should
create Pods or not. If a Job is created with suspend set to
true, no Pods are created by the Job controller. If a Job is
suspended after creation (i.e. the flag goes from false to true),
the Job controller will delete all active Pods and PodGroups
associated with this Job. Users must design their workload to
gracefully handle this.
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
93 changes: 70 additions & 23 deletions pkg/controller.v1/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/kubeflow/training-operator/pkg/core"
commonutil "github.com/kubeflow/training-operator/pkg/util"
"github.com/kubeflow/training-operator/pkg/util/k8sutil"
trainutil "github.com/kubeflow/training-operator/pkg/util/train"

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
Expand All @@ -38,13 +39,13 @@ import (
volcanov1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
)

func (jc *JobController) DeletePodsAndServices(runPolicy *apiv1.RunPolicy, job interface{}, pods []*corev1.Pod) error {
func (jc *JobController) DeletePodsAndServices(runPolicy *apiv1.RunPolicy, runtimeObject runtime.Object, pods []*corev1.Pod) error {
if len(pods) == 0 {
return nil
}

// Delete nothing when the cleanPodPolicy is None.
if *runPolicy.CleanPodPolicy == apiv1.CleanPodPolicyNone {
// Delete nothing when the cleanPodPolicy is None and the runPolicy.suspend is false.
if !trainutil.IsJobSuspended(runPolicy) && *runPolicy.CleanPodPolicy == apiv1.CleanPodPolicyNone {
return nil
}

Expand All @@ -55,11 +56,11 @@ func (jc *JobController) DeletePodsAndServices(runPolicy *apiv1.RunPolicy, job i
if *runPolicy.CleanPodPolicy == apiv1.CleanPodPolicyRunning && pod.Status.Phase != corev1.PodRunning && pod.Status.Phase != corev1.PodPending {
continue
}
if err := jc.PodControl.DeletePod(pod.Namespace, pod.Name, job.(runtime.Object)); err != nil {
if err := jc.PodControl.DeletePod(pod.Namespace, pod.Name, runtimeObject); err != nil {
return err
}
// Pod and service have the same name, thus the service could be deleted using pod's name.
if err := jc.ServiceControl.DeleteService(pod.Namespace, pod.Name, job.(runtime.Object)); err != nil {
if err := jc.ServiceControl.DeleteService(pod.Namespace, pod.Name, runtimeObject); err != nil {
return err
}
}
Expand Down Expand Up @@ -118,22 +119,8 @@ func (jc *JobController) ReconcileJobs(

oldStatus := jobStatus.DeepCopy()
if commonutil.IsSucceeded(jobStatus) || commonutil.IsFailed(jobStatus) {
// If the Job is succeed or failed, delete all pods and services.
if err := jc.DeletePodsAndServices(runPolicy, job, pods); err != nil {
return err
}

if jc.Config.EnableGangScheduling() {
jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, "JobTerminated", "Job has been terminated. Deleting PodGroup")
if err := jc.DeletePodGroup(metaObject); err != nil {
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeWarning, "FailedDeletePodGroup", "Error deleting: %v", err)
return err
} else {
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", jobName)
}
}

if err := jc.CleanupJob(runPolicy, jobStatus, job); err != nil {
// If the Job is succeed or failed, delete all pods, services, and podGroup.
if err = jc.CleanUpResources(runPolicy, runtimeObject, metaObject, jobStatus, pods); err != nil {
return err
}

Expand All @@ -155,6 +142,40 @@ func (jc *JobController) ReconcileJobs(
return nil
}

if trainutil.IsJobSuspended(runPolicy) {
if err = jc.CleanUpResources(runPolicy, runtimeObject, metaObject, jobStatus, pods); err != nil {
return err
}
for rType := range jobStatus.ReplicaStatuses {
jobStatus.ReplicaStatuses[rType].Active = 0
}
jobStatus.StartTime = nil
msg := fmt.Sprintf("%s %s is suspended.", jc.Controller.GetAPIGroupVersionKind().Kind, jobName)
if commonutil.IsRunning(jobStatus) {
if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobRunning, corev1.ConditionFalse,
commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg); err != nil {
return err
}
}
if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobSuspended, corev1.ConditionTrue,
commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg); err != nil {
return err
}
jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg)
if !reflect.DeepEqual(*oldStatus, jobStatus) {
return jc.Controller.UpdateJobStatusInApiServer(job, &jobStatus)
}
return nil
}
if !trainutil.IsJobSuspended(runPolicy) && commonutil.IsSuspend(jobStatus) {
msg := fmt.Sprintf("%s %s is resumed.", jc.Controller.GetAPIGroupVersionKind().Kind, jobName)
if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobSuspended, corev1.ConditionFalse,
commonutil.NewReason(jobKind, commonutil.JobResumedReason), msg); err != nil {
return err
}
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobResumedReason), msg)
}

// retrieve the previous number of retry
previousRetry := jc.WorkQueue.NumRequeues(jobKey)

Expand Down Expand Up @@ -205,7 +226,7 @@ func (jc *JobController) ReconcileJobs(

// If the Job exceeds backoff limit or is past active deadline
// delete all pods and services, then set the status to failed
if err := jc.DeletePodsAndServices(runPolicy, job, pods); err != nil {
if err := jc.DeletePodsAndServices(runPolicy, runtimeObject, pods); err != nil {
return err
}

Expand All @@ -225,7 +246,7 @@ func (jc *JobController) ReconcileJobs(

jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage)

if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage); err != nil {
if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage); err != nil {
log.Infof("Append job condition error: %v", err)
return err
}
Expand Down Expand Up @@ -344,6 +365,32 @@ func (jc *JobController) ReconcileJobs(
return nil
}

func (jc *JobController) CleanUpResources(
runPolicy *apiv1.RunPolicy,
runtimeObject runtime.Object,
metaObject metav1.Object,
jobStatus apiv1.JobStatus,
pods []*v1.Pod,
) error {
if err := jc.DeletePodsAndServices(runPolicy, runtimeObject, pods); err != nil {
return err
}
if jc.Config.EnableGangScheduling() {

jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, "JobTerminated", "Job has been terminated. Deleting PodGroup")
if err := jc.DeletePodGroup(metaObject); err != nil {
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeWarning, "FailedDeletePodGroup", "Error deleting: %v", err)
return err
} else {
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", metaObject.GetName())
}
}
if err := jc.CleanupJob(runPolicy, jobStatus, runtimeObject); err != nil {
return err
}
return nil
}

// ResetExpectations reset the expectation for creates and deletes of pod/service to zero.
func (jc *JobController) ResetExpectations(jobKey string, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) error {
var allErrs error
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v1/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func (jc *JobController) ReconcilePods(
metaObject.GetName(), rType)
jc.Recorder.Event(runtimeObject, v1.EventTypeWarning,
commonutil.NewReason(jc.Controller.GetAPIGroupVersionKind().Kind, commonutil.JobRestartingReason), msg)
if err := commonutil.UpdateJobConditions(jobStatus, apiv1.JobRestarting,
if err := commonutil.UpdateJobConditions(jobStatus, apiv1.JobRestarting, v1.ConditionTrue,
commonutil.NewReason(jc.Controller.GetAPIGroupVersionKind().Kind, commonutil.JobRestartingReason), msg); err != nil {
commonutil.LoggerForJob(metaObject).Infof("Append job condition error: %v", err)
return err
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller.v1/mpi/mpijob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (jc *MPIJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool {
msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, e.Object.GetName())
logrus.Info(msg)
trainingoperatorcommon.CreatedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName())
if err := commonutil.UpdateJobConditions(&mpiJob.Status, kubeflowv1.JobCreated,
if err := commonutil.UpdateJobConditions(&mpiJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue,
commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobCreatedReason), msg); err != nil {
log.Log.Error(err, "append job condition error")
return false
Expand Down Expand Up @@ -583,7 +583,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
if rtype == kubeflowv1.MPIJobReplicaTypeLauncher {
if running > 0 {
msg := fmt.Sprintf("MPIJob %s is running.", mpiJob.Name)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRunningReason), msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRunningReason), msg)
if err != nil {
commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err)
return err
Expand All @@ -598,7 +598,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
now := metav1.Now()
jobStatus.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSucceededReason), msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSucceededReason), msg)
if err != nil {
commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err)
return err
Expand All @@ -611,7 +611,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode {
msg := fmt.Sprintf("MPIJob %s is restarting because %d %s replica(s) failed.", mpiJob.Name, failed, rtype)
jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRestartingReason), msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRestartingReason), msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRestartingReason), msg)
if err != nil {
commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err)
return err
Expand All @@ -624,7 +624,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
now := metav1.Now()
jobStatus.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed,
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue,
commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedReason)), msg)
if err != nil {
commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err)
Expand Down
Loading

0 comments on commit 383e6b7

Please sign in to comment.