Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Pytorch with PyTorch #1874

Merged
merged 1 commit into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions pkg/apis/kubeflow.org/v1/pytorch_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ var (
DefaultNprocPerNode = "auto"
)

func addPytorchDefaultingFuncs(scheme *runtime.Scheme) error {
func addPyTorchDefaultingFuncs(scheme *runtime.Scheme) error {
return RegisterDefaults(scheme)
}

// setPytorchDefaultPort sets the default ports for pytorch container.
func setPytorchDefaultPort(spec *corev1.PodSpec) {
index := getDefaultContainerIndex(spec, PytorchJobDefaultContainerName)
if ok := hasDefaultPort(spec, index, PytorchJobDefaultPortName); !ok {
setDefaultPort(spec, PytorchJobDefaultPortName, PytorchJobDefaultPort, index)
// setPyTorchDefaultPort sets the default ports for pytorch container.
func setPyTorchDefaultPort(spec *corev1.PodSpec) {
index := getDefaultContainerIndex(spec, PyTorchJobDefaultContainerName)
if ok := hasDefaultPort(spec, index, PyTorchJobDefaultPortName); !ok {
setDefaultPort(spec, PyTorchJobDefaultPortName, PyTorchJobDefaultPort, index)
}
}

Expand All @@ -54,8 +54,8 @@ func setElasticPolicy(pytorchJob *PyTorchJob) {
}
}

// setPytorchTypeNamesToCamelCase sets the name of all replica types from any case to correct case.
func setPytorchTypeNamesToCamelCase(pytorchJob *PyTorchJob) {
// setPyTorchTypeNamesToCamelCase sets the name of all replica types from any case to correct case.
func setPyTorchTypeNamesToCamelCase(pytorchJob *PyTorchJob) {
replicaTypes := []ReplicaType{
PyTorchJobReplicaTypeMaster,
PyTorchJobReplicaTypeWorker,
Expand All @@ -81,12 +81,12 @@ func SetDefaults_PyTorchJob(job *PyTorchJob) {
}

// Update the key of PyTorchReplicaSpecs to camel case.
setPytorchTypeNamesToCamelCase(job)
setPyTorchTypeNamesToCamelCase(job)

for _, spec := range job.Spec.PyTorchReplicaSpecs {
setDefaultReplicas(spec, 1)
setDefaultRestartPolicy(spec, PytorchJobDefaultRestartPolicy)
setPytorchDefaultPort(&spec.Template.Spec)
setDefaultRestartPolicy(spec, PyTorchJobDefaultRestartPolicy)
setPyTorchDefaultPort(&spec.Template.Spec)
}
// Set default elastic policy.
setElasticPolicy(job)
Expand Down
34 changes: 17 additions & 17 deletions pkg/apis/kubeflow.org/v1/pytorch_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,23 @@ import (
)

const (
// PytorchJobDefaultPortName is name of the port used to communicate between Master and
// PyTorchJobDefaultPortName is name of the port used to communicate between Master and
// workers.
PytorchJobDefaultPortName = "pytorchjob-port"
// PytorchJobDefaultContainerName is the name of the PyTorchJob container.
PytorchJobDefaultContainerName = "pytorch"
// PytorchJobDefaultPort is default value of the port.
PytorchJobDefaultPort = 23456
// PytorchJobDefaultRestartPolicy is default RestartPolicy for PyTorchReplicaSpec.
PytorchJobDefaultRestartPolicy = RestartPolicyOnFailure
// PytorchJobKind is the kind name.
PytorchJobKind = "PyTorchJob"
// PytorchJobPlural is the PytorchPlural for pytorchJob.
PytorchJobPlural = "pytorchjobs"
// PytorchJobSingular is the singular for pytorchJob.
PytorchJobSingular = "pytorchjob"
// PytorchJobFrameworkName is the name of the ML Framework
PytorchJobFrameworkName = "pytorch"
PyTorchJobDefaultPortName = "pytorchjob-port"
// PyTorchJobDefaultContainerName is the name of the PyTorchJob container.
PyTorchJobDefaultContainerName = "pytorch"
// PyTorchJobDefaultPort is default value of the port.
PyTorchJobDefaultPort = 23456
// PyTorchJobDefaultRestartPolicy is default RestartPolicy for PyTorchReplicaSpec.
PyTorchJobDefaultRestartPolicy = RestartPolicyOnFailure
// PyTorchJobKind is the kind name.
PyTorchJobKind = "PyTorchJob"
// PyTorchJobPlural is the PyTorchPlural for pytorchJob.
PyTorchJobPlural = "pytorchjobs"
// PyTorchJobSingular is the singular for pytorchJob.
PyTorchJobSingular = "pytorchjob"
// PyTorchJobFrameworkName is the name of the ML Framework
PyTorchJobFrameworkName = "pytorch"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this have to be "PyTorch" too?

Copy link
Member Author

@tenzen-y tenzen-y Aug 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// PyTorchJobReplicaTypeMaster is the type of Master of distributed PyTorch
PyTorchJobReplicaTypeMaster ReplicaType = "Master"
// PyTorchJobReplicaTypeWorker is the type for workers of distributed PyTorch.
Expand Down Expand Up @@ -168,5 +168,5 @@ type PyTorchJobList struct {

func init() {
SchemeBuilder.Register(&PyTorchJob{}, &PyTorchJobList{})
SchemeBuilder.SchemeBuilder.Register(addPytorchDefaultingFuncs)
SchemeBuilder.SchemeBuilder.Register(addPyTorchDefaultingFuncs)
}
4 changes: 2 additions & 2 deletions pkg/apis/kubeflow.org/v1/pytorch_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ func validatePyTorchReplicaSpecs(specs map[ReplicaType]*ReplicaSpec) error {
msg := fmt.Sprintf("PyTorchJobSpec is not valid: Image is undefined in the container of %v", rType)
return fmt.Errorf(msg)
}
if container.Name == PytorchJobDefaultContainerName {
if container.Name == PyTorchJobDefaultContainerName {
defaultContainerPresent = true
}
}
//Make sure there has at least one container named "pytorch"
if !defaultContainerPresent {
msg := fmt.Sprintf("PyTorchJobSpec is not valid: There is no container named %s in %v", PytorchJobDefaultContainerName, rType)
msg := fmt.Sprintf("PyTorchJobSpec is not valid: There is no container named %s in %v", PyTorchJobDefaultContainerName, rType)
return fmt.Errorf(msg)
}
if rType == PyTorchJobReplicaTypeMaster {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller.v1/pytorch/envvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,10 @@ func replicaName(jobName string, rtype kubeflowv1.ReplicaType, index int) string
func getPortFromPyTorchJob(job *kubeflowv1.PyTorchJob, rtype kubeflowv1.ReplicaType) (int32, error) {
containers := job.Spec.PyTorchReplicaSpecs[rtype].Template.Spec.Containers
for _, container := range containers {
if container.Name == kubeflowv1.PytorchJobDefaultContainerName {
if container.Name == kubeflowv1.PyTorchJobDefaultContainerName {
ports := container.Ports
for _, port := range ports {
if port.Name == kubeflowv1.PytorchJobDefaultPortName {
if port.Name == kubeflowv1.PyTorchJobDefaultPortName {
return port.ContainerPort, nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v1/pytorch/hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

func (r *PyTorchJobReconciler) ReconcileHPA(pytorchJob *kubeflowv1.PyTorchJob) error {
logger := r.Log.WithValues(kubeflowv1.PytorchJobSingular, pytorchJob.Name)
logger := r.Log.WithValues(kubeflowv1.PyTorchJobSingular, pytorchJob.Name)

if pytorchJob.Spec.ElasticPolicy == nil || pytorchJob.Spec.ElasticPolicy.Metrics == nil {
logger.V(1).Info(
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v1/pytorch/initcontainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func setInitContainer(obj interface{}, podTemplate *corev1.PodTemplateSpec,
if !ok {
return fmt.Errorf("%+v is not a type of PyTorchJob", obj)
}
logger := log.WithValues(kubeflowv1.PytorchJobSingular, types.NamespacedName{
logger := log.WithValues(kubeflowv1.PyTorchJobSingular, types.NamespacedName{
Namespace: pytorchJob.Namespace,
Name: pytorchJob.Name,
})
Expand Down
34 changes: 17 additions & 17 deletions pkg/controller.v1/pytorch/pytorchjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ type PyTorchJobReconciler struct {
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.8.3/pkg/reconcile
func (r *PyTorchJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)
logger := r.Log.WithValues(kubeflowv1.PytorchJobSingular, req.NamespacedName)
logger := r.Log.WithValues(kubeflowv1.PyTorchJobSingular, req.NamespacedName)

pytorchjob := &kubeflowv1.PyTorchJob{}
err := r.Get(ctx, req.NamespacedName, pytorchjob)
Expand All @@ -134,7 +134,7 @@ func (r *PyTorchJobReconciler) Reconcile(ctx context.Context, req ctrl.Request)

if err = kubeflowv1.ValidateV1PyTorchJob(pytorchjob); err != nil {
logger.Error(err, "PyTorchJob failed validation")
r.Recorder.Eventf(pytorchjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobFailedValidationReason),
r.Recorder.Eventf(pytorchjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobFailedValidationReason),
"PyTorchJob failed validation because %s", err)
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -242,7 +242,7 @@ func (r *PyTorchJobReconciler) ControllerName() string {
}

func (r *PyTorchJobReconciler) GetAPIGroupVersionKind() schema.GroupVersionKind {
return kubeflowv1.GroupVersion.WithKind(kubeflowv1.PytorchJobKind)
return kubeflowv1.GroupVersion.WithKind(kubeflowv1.PyTorchJobKind)
}

func (r *PyTorchJobReconciler) GetAPIGroupVersion() schema.GroupVersion {
Expand All @@ -254,7 +254,7 @@ func (r *PyTorchJobReconciler) GetGroupNameLabelValue() string {
}

func (r *PyTorchJobReconciler) GetFrameworkName() string {
return kubeflowv1.PytorchJobFrameworkName
return kubeflowv1.PyTorchJobFrameworkName
}

func (r *PyTorchJobReconciler) GetJobFromInformerCache(namespace, name string) (metav1.Object, error) {
Expand Down Expand Up @@ -392,18 +392,18 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{},
if rtype == kubeflowv1.PyTorchJobReplicaTypeMaster {
if running > 0 {
msg := fmt.Sprintf("PyTorchJob %s is running.", pytorchjob.Name)
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRunningReason), msg)
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobRunningReason), msg)
}
// when master is succeed, the job is finished.
if expected == 0 {
msg := fmt.Sprintf("PyTorchJob %s is successfully completed.", pytorchjob.Name)
logrus.Info(msg)
r.Recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg)
r.Recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobSucceededReason), msg)
if jobStatus.CompletionTime == nil {
now := metav1.Now()
jobStatus.CompletionTime = &now
}
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg)
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobSucceededReason), msg)
trainingoperatorcommon.SuccessfulJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName())
return nil
}
Expand All @@ -417,36 +417,36 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{},
if expected == 0 || (pytorchjob.Spec.ElasticPolicy != nil && succeeded > 0) {
msg := fmt.Sprintf("PyTorchJob %s/%s successfully completed.",
pytorchjob.Namespace, pytorchjob.Name)
r.recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg)
r.recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobSucceededReason), msg)
if jobStatus.CompletionTime == nil {
now := metav1.Now()
jobStatus.CompletionTime = &now
}
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg)
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobSucceededReason), msg)
trainingoperatorcommon.SuccessfulJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName())
} else if running > 0 {
// Some workers are still running, leave a running condition.
msg := fmt.Sprintf("PyTorchJob %s/%s is running.",
pytorchjob.Namespace, pytorchjob.Name)
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRunningReason), msg)
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobRunningReason), msg)
}
}
}

if failed > 0 && (specReplicas > succeeded+running) {
if spec.RestartPolicy != kubeflowv1.RestartPolicyNever {
msg := fmt.Sprintf("PyTorchJob %s is restarting because %d %s replica(s) failed.", pytorchjob.Name, failed, rtype)
r.Recorder.Event(pytorchjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRestartingReason), msg)
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRestartingReason), msg)
r.Recorder.Event(pytorchjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobRestartingReason), msg)
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobRestartingReason), msg)
trainingoperatorcommon.RestartedJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName())
} else {
msg := fmt.Sprintf("PyTorchJob %s is failed because %d %s replica(s) failed.", pytorchjob.Name, failed, rtype)
r.Recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobFailedReason), msg)
r.Recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobFailedReason), msg)
if jobStatus.CompletionTime == nil {
now := metav1.Now()
jobStatus.CompletionTime = &now
}
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobFailedReason), msg)
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobFailedReason), msg)
trainingoperatorcommon.FailedJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName())
}
}
Expand Down Expand Up @@ -505,11 +505,11 @@ func (r *PyTorchJobReconciler) SetClusterSpec(job interface{}, podTemplate *core
}

func (r *PyTorchJobReconciler) GetDefaultContainerName() string {
return kubeflowv1.PytorchJobDefaultContainerName
return kubeflowv1.PyTorchJobDefaultContainerName
}

func (r *PyTorchJobReconciler) GetDefaultContainerPortName() string {
return kubeflowv1.PytorchJobDefaultPortName
return kubeflowv1.PyTorchJobDefaultPortName
}

func (r *PyTorchJobReconciler) IsMasterRole(replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec,
Expand All @@ -528,7 +528,7 @@ func (r *PyTorchJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool
msg := fmt.Sprintf("PyTorchJob %s is created.", e.Object.GetName())
logrus.Info(msg)
trainingoperatorcommon.CreatedJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName())
commonutil.UpdateJobConditions(&pytorchjob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobCreatedReason), msg)
commonutil.UpdateJobConditions(&pytorchjob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobCreatedReason), msg)
return true
}
}
Loading