Skip to content

Commit

Permalink
Replace Pytorch with PyTorch
Browse files Browse the repository at this point in the history
Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
  • Loading branch information
tenzen-y committed Aug 1, 2023
1 parent de7c73e commit 66bd9bd
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 75 deletions.
22 changes: 11 additions & 11 deletions pkg/apis/kubeflow.org/v1/pytorch_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ var (
DefaultNprocPerNode = "auto"
)

func addPytorchDefaultingFuncs(scheme *runtime.Scheme) error {
func addPyTorchDefaultingFuncs(scheme *runtime.Scheme) error {
return RegisterDefaults(scheme)
}

// setPytorchDefaultPort sets the default ports for pytorch container.
func setPytorchDefaultPort(spec *corev1.PodSpec) {
index := getDefaultContainerIndex(spec, PytorchJobDefaultContainerName)
if ok := hasDefaultPort(spec, index, PytorchJobDefaultPortName); !ok {
setDefaultPort(spec, PytorchJobDefaultPortName, PytorchJobDefaultPort, index)
// setPyTorchDefaultPort sets the default ports for pytorch container.
func setPyTorchDefaultPort(spec *corev1.PodSpec) {
index := getDefaultContainerIndex(spec, PyTorchJobDefaultContainerName)
if ok := hasDefaultPort(spec, index, PyTorchJobDefaultPortName); !ok {
setDefaultPort(spec, PyTorchJobDefaultPortName, PyTorchJobDefaultPort, index)
}
}

Expand All @@ -54,8 +54,8 @@ func setElasticPolicy(pytorchJob *PyTorchJob) {
}
}

// setPytorchTypeNamesToCamelCase sets the name of all replica types from any case to correct case.
func setPytorchTypeNamesToCamelCase(pytorchJob *PyTorchJob) {
// setPyTorchTypeNamesToCamelCase sets the name of all replica types from any case to correct case.
func setPyTorchTypeNamesToCamelCase(pytorchJob *PyTorchJob) {
replicaTypes := []ReplicaType{
PyTorchJobReplicaTypeMaster,
PyTorchJobReplicaTypeWorker,
Expand All @@ -81,12 +81,12 @@ func SetDefaults_PyTorchJob(job *PyTorchJob) {
}

// Update the key of PyTorchReplicaSpecs to camel case.
setPytorchTypeNamesToCamelCase(job)
setPyTorchTypeNamesToCamelCase(job)

for _, spec := range job.Spec.PyTorchReplicaSpecs {
setDefaultReplicas(spec, 1)
setDefaultRestartPolicy(spec, PytorchJobDefaultRestartPolicy)
setPytorchDefaultPort(&spec.Template.Spec)
setDefaultRestartPolicy(spec, PyTorchJobDefaultRestartPolicy)
setPyTorchDefaultPort(&spec.Template.Spec)
}
// Set default elastic policy.
setElasticPolicy(job)
Expand Down
34 changes: 17 additions & 17 deletions pkg/apis/kubeflow.org/v1/pytorch_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,23 @@ import (
)

const (
// PytorchJobDefaultPortName is name of the port used to communicate between Master and
// PyTorchJobDefaultPortName is name of the port used to communicate between Master and
// workers.
PytorchJobDefaultPortName = "pytorchjob-port"
// PytorchJobDefaultContainerName is the name of the PyTorchJob container.
PytorchJobDefaultContainerName = "pytorch"
// PytorchJobDefaultPort is default value of the port.
PytorchJobDefaultPort = 23456
// PytorchJobDefaultRestartPolicy is default RestartPolicy for PyTorchReplicaSpec.
PytorchJobDefaultRestartPolicy = RestartPolicyOnFailure
// PytorchJobKind is the kind name.
PytorchJobKind = "PyTorchJob"
// PytorchJobPlural is the PytorchPlural for pytorchJob.
PytorchJobPlural = "pytorchjobs"
// PytorchJobSingular is the singular for pytorchJob.
PytorchJobSingular = "pytorchjob"
// PytorchJobFrameworkName is the name of the ML Framework
PytorchJobFrameworkName = "pytorch"
PyTorchJobDefaultPortName = "pytorchjob-port"
// PyTorchJobDefaultContainerName is the name of the PyTorchJob container.
PyTorchJobDefaultContainerName = "pytorch"
// PyTorchJobDefaultPort is default value of the port.
PyTorchJobDefaultPort = 23456
// PyTorchJobDefaultRestartPolicy is default RestartPolicy for PyTorchReplicaSpec.
PyTorchJobDefaultRestartPolicy = RestartPolicyOnFailure
// PyTorchJobKind is the kind name.
PyTorchJobKind = "PyTorchJob"
// PyTorchJobPlural is the PyTorchPlural for pytorchJob.
PyTorchJobPlural = "pytorchjobs"
// PyTorchJobSingular is the singular for pytorchJob.
PyTorchJobSingular = "pytorchjob"
// PyTorchJobFrameworkName is the name of the ML Framework
PyTorchJobFrameworkName = "pytorch"
// PyTorchJobReplicaTypeMaster is the type of Master of distributed PyTorch
PyTorchJobReplicaTypeMaster ReplicaType = "Master"
// PyTorchJobReplicaTypeWorker is the type for workers of distributed PyTorch.
Expand Down Expand Up @@ -168,5 +168,5 @@ type PyTorchJobList struct {

func init() {
SchemeBuilder.Register(&PyTorchJob{}, &PyTorchJobList{})
SchemeBuilder.SchemeBuilder.Register(addPytorchDefaultingFuncs)
SchemeBuilder.SchemeBuilder.Register(addPyTorchDefaultingFuncs)
}
4 changes: 2 additions & 2 deletions pkg/apis/kubeflow.org/v1/pytorch_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ func validatePyTorchReplicaSpecs(specs map[ReplicaType]*ReplicaSpec) error {
msg := fmt.Sprintf("PyTorchJobSpec is not valid: Image is undefined in the container of %v", rType)
return fmt.Errorf(msg)
}
if container.Name == PytorchJobDefaultContainerName {
if container.Name == PyTorchJobDefaultContainerName {
defaultContainerPresent = true
}
}
//Make sure there has at least one container named "pytorch"
if !defaultContainerPresent {
msg := fmt.Sprintf("PyTorchJobSpec is not valid: There is no container named %s in %v", PytorchJobDefaultContainerName, rType)
msg := fmt.Sprintf("PyTorchJobSpec is not valid: There is no container named %s in %v", PyTorchJobDefaultContainerName, rType)
return fmt.Errorf(msg)
}
if rType == PyTorchJobReplicaTypeMaster {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller.v1/pytorch/envvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,10 @@ func replicaName(jobName string, rtype kubeflowv1.ReplicaType, index int) string
func getPortFromPyTorchJob(job *kubeflowv1.PyTorchJob, rtype kubeflowv1.ReplicaType) (int32, error) {
containers := job.Spec.PyTorchReplicaSpecs[rtype].Template.Spec.Containers
for _, container := range containers {
if container.Name == kubeflowv1.PytorchJobDefaultContainerName {
if container.Name == kubeflowv1.PyTorchJobDefaultContainerName {
ports := container.Ports
for _, port := range ports {
if port.Name == kubeflowv1.PytorchJobDefaultPortName {
if port.Name == kubeflowv1.PyTorchJobDefaultPortName {
return port.ContainerPort, nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v1/pytorch/hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

func (r *PyTorchJobReconciler) ReconcileHPA(pytorchJob *kubeflowv1.PyTorchJob) error {
logger := r.Log.WithValues(kubeflowv1.PytorchJobSingular, pytorchJob.Name)
logger := r.Log.WithValues(kubeflowv1.PyTorchJobSingular, pytorchJob.Name)

if pytorchJob.Spec.ElasticPolicy == nil || pytorchJob.Spec.ElasticPolicy.Metrics == nil {
logger.V(1).Info(
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v1/pytorch/initcontainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func setInitContainer(obj interface{}, podTemplate *corev1.PodTemplateSpec,
if !ok {
return fmt.Errorf("%+v is not a type of PyTorchJob", obj)
}
logger := log.WithValues(kubeflowv1.PytorchJobSingular, types.NamespacedName{
logger := log.WithValues(kubeflowv1.PyTorchJobSingular, types.NamespacedName{
Namespace: pytorchJob.Namespace,
Name: pytorchJob.Name,
})
Expand Down
34 changes: 17 additions & 17 deletions pkg/controller.v1/pytorch/pytorchjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ type PyTorchJobReconciler struct {
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.8.3/pkg/reconcile
func (r *PyTorchJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)
logger := r.Log.WithValues(kubeflowv1.PytorchJobSingular, req.NamespacedName)
logger := r.Log.WithValues(kubeflowv1.PyTorchJobSingular, req.NamespacedName)

pytorchjob := &kubeflowv1.PyTorchJob{}
err := r.Get(ctx, req.NamespacedName, pytorchjob)
Expand All @@ -134,7 +134,7 @@ func (r *PyTorchJobReconciler) Reconcile(ctx context.Context, req ctrl.Request)

if err = kubeflowv1.ValidateV1PyTorchJob(pytorchjob); err != nil {
logger.Error(err, "PyTorchJob failed validation")
r.Recorder.Eventf(pytorchjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobFailedValidationReason),
r.Recorder.Eventf(pytorchjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobFailedValidationReason),
"PyTorchJob failed validation because %s", err)
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -242,7 +242,7 @@ func (r *PyTorchJobReconciler) ControllerName() string {
}

func (r *PyTorchJobReconciler) GetAPIGroupVersionKind() schema.GroupVersionKind {
return kubeflowv1.GroupVersion.WithKind(kubeflowv1.PytorchJobKind)
return kubeflowv1.GroupVersion.WithKind(kubeflowv1.PyTorchJobKind)
}

func (r *PyTorchJobReconciler) GetAPIGroupVersion() schema.GroupVersion {
Expand All @@ -254,7 +254,7 @@ func (r *PyTorchJobReconciler) GetGroupNameLabelValue() string {
}

func (r *PyTorchJobReconciler) GetFrameworkName() string {
return kubeflowv1.PytorchJobFrameworkName
return kubeflowv1.PyTorchJobFrameworkName
}

func (r *PyTorchJobReconciler) GetJobFromInformerCache(namespace, name string) (metav1.Object, error) {
Expand Down Expand Up @@ -392,18 +392,18 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{},
if rtype == kubeflowv1.PyTorchJobReplicaTypeMaster {
if running > 0 {
msg := fmt.Sprintf("PyTorchJob %s is running.", pytorchjob.Name)
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRunningReason), msg)
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobRunningReason), msg)
}
// when master is succeed, the job is finished.
if expected == 0 {
msg := fmt.Sprintf("PyTorchJob %s is successfully completed.", pytorchjob.Name)
logrus.Info(msg)
r.Recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg)
r.Recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobSucceededReason), msg)
if jobStatus.CompletionTime == nil {
now := metav1.Now()
jobStatus.CompletionTime = &now
}
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg)
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobSucceededReason), msg)
trainingoperatorcommon.SuccessfulJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName())
return nil
}
Expand All @@ -417,36 +417,36 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{},
if expected == 0 || (pytorchjob.Spec.ElasticPolicy != nil && succeeded > 0) {
msg := fmt.Sprintf("PyTorchJob %s/%s successfully completed.",
pytorchjob.Namespace, pytorchjob.Name)
r.recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg)
r.recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobSucceededReason), msg)
if jobStatus.CompletionTime == nil {
now := metav1.Now()
jobStatus.CompletionTime = &now
}
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg)
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobSucceededReason), msg)
trainingoperatorcommon.SuccessfulJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName())
} else if running > 0 {
// Some workers are still running, leave a running condition.
msg := fmt.Sprintf("PyTorchJob %s/%s is running.",
pytorchjob.Namespace, pytorchjob.Name)
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRunningReason), msg)
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobRunningReason), msg)
}
}
}

if failed > 0 && (specReplicas > succeeded+running) {
if spec.RestartPolicy != kubeflowv1.RestartPolicyNever {
msg := fmt.Sprintf("PyTorchJob %s is restarting because %d %s replica(s) failed.", pytorchjob.Name, failed, rtype)
r.Recorder.Event(pytorchjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRestartingReason), msg)
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRestartingReason), msg)
r.Recorder.Event(pytorchjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobRestartingReason), msg)
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobRestartingReason), msg)
trainingoperatorcommon.RestartedJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName())
} else {
msg := fmt.Sprintf("PyTorchJob %s is failed because %d %s replica(s) failed.", pytorchjob.Name, failed, rtype)
r.Recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobFailedReason), msg)
r.Recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobFailedReason), msg)
if jobStatus.CompletionTime == nil {
now := metav1.Now()
jobStatus.CompletionTime = &now
}
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobFailedReason), msg)
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobFailedReason), msg)
trainingoperatorcommon.FailedJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName())
}
}
Expand Down Expand Up @@ -505,11 +505,11 @@ func (r *PyTorchJobReconciler) SetClusterSpec(job interface{}, podTemplate *core
}

func (r *PyTorchJobReconciler) GetDefaultContainerName() string {
return kubeflowv1.PytorchJobDefaultContainerName
return kubeflowv1.PyTorchJobDefaultContainerName
}

func (r *PyTorchJobReconciler) GetDefaultContainerPortName() string {
return kubeflowv1.PytorchJobDefaultPortName
return kubeflowv1.PyTorchJobDefaultPortName
}

func (r *PyTorchJobReconciler) IsMasterRole(replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec,
Expand All @@ -528,7 +528,7 @@ func (r *PyTorchJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool
msg := fmt.Sprintf("PyTorchJob %s is created.", e.Object.GetName())
logrus.Info(msg)
trainingoperatorcommon.CreatedJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName())
commonutil.UpdateJobConditions(&pytorchjob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobCreatedReason), msg)
commonutil.UpdateJobConditions(&pytorchjob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PyTorchJobKind, commonutil.JobCreatedReason), msg)
return true
}
}
Loading

0 comments on commit 66bd9bd

Please sign in to comment.