Skip to content

Commit

Permalink
Register KubeflowJob to the manager instead of PyTorchJob
Browse files Browse the repository at this point in the history
Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
  • Loading branch information
tenzen-y committed Aug 9, 2023
1 parent ac930dc commit fc23a88
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 72 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -337,4 +337,3 @@ JOBSETROOT = $(shell $(GO_CMD) list -m -f "{{.Dir}}" sigs.k8s.io/jobset)
jobset-operator-crd:
mkdir -p $(PROJECT_DIR)/dep-crds/jobset-operator/
cp -f $(JOBSETROOT)/config/components/crd/bases/* $(PROJECT_DIR)/dep-crds/jobset-operator/

1 change: 1 addition & 0 deletions config/components/webhook/manifests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ webhooks:
apiVersions:
- v1
operations:
- CREATE
- UPDATE
resources:
- pytorchjobs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

var (
gvk = kftraining.SchemeGroupVersion.WithKind(kftraining.PytorchJobKind)
gvk = kftraining.SchemeGroupVersion.WithKind(kftraining.PyTorchJobKind)
FrameworkName = "kubeflow.org/pytorchjob"
)

Expand All @@ -54,34 +54,46 @@ func init() {
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=resourceflavors,verbs=get;list;watch

var NewReconciler = jobframework.NewGenericReconciler(func() jobframework.GenericJob {
return &PyTorchJob{kubeflowjob.NewKubeflowJob((*JobControl)(&kftraining.PyTorchJob{}))}
return &kubeflowjob.KubeflowJob{KFJobControl: (*JobControl)(&kftraining.PyTorchJob{})}
}, nil)

func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error {
return jobframework.SetupWorkloadOwnerIndex(ctx, indexer, gvk)
type JobControl kftraining.PyTorchJob

var _ kubeflowjob.KFJobControl = (*JobControl)(nil)

func (j *JobControl) Object() client.Object {
return (*kftraining.PyTorchJob)(j)
}

func GetWorkloadNameForPyTorchJob(jobName string) string {
return jobframework.GetWorkloadNameForOwnerWithGVK(jobName, gvk)
func fromObject(o runtime.Object) *kubeflowjob.KubeflowJob {
return &kubeflowjob.KubeflowJob{KFJobControl: (*JobControl)(o.(*kftraining.PyTorchJob))}
}

type PyTorchJob struct {
*kubeflowjob.KubeflowJob
func (j *JobControl) GetGVK() schema.GroupVersionKind {
return gvk
}

var _ jobframework.GenericJob = (*PyTorchJob)(nil)
var _ jobframework.JobWithPriorityClass = (*PyTorchJob)(nil)
func (j *JobControl) RunPolicy() *kftraining.RunPolicy {
return &j.Spec.RunPolicy
}

func fromObject(o runtime.Object) *PyTorchJob {
return &PyTorchJob{kubeflowjob.NewKubeflowJob((*JobControl)(o.(*kftraining.PyTorchJob)))}
func (j *JobControl) ReplicaSpecs() map[kftraining.ReplicaType]*kftraining.ReplicaSpec {
return j.Spec.PyTorchReplicaSpecs
}

func (j *PyTorchJob) GetGVK() schema.GroupVersionKind {
return gvk
func (j *JobControl) JobStatus() kftraining.JobStatus {
return j.Status
}

func (j *PyTorchJob) Object() client.Object {
return (*kftraining.PyTorchJob)(j.KFJobControl.(*JobControl))
func (j *JobControl) OrderedReplicaTypes(replicaSpecs map[kftraining.ReplicaType]*kftraining.ReplicaSpec) []kftraining.ReplicaType {
result := make([]kftraining.ReplicaType, 0, 2)
if _, ok := replicaSpecs[kftraining.PyTorchJobReplicaTypeMaster]; ok {
result = append(result, kftraining.PyTorchJobReplicaTypeMaster)
}
if _, ok := replicaSpecs[kftraining.PyTorchJobReplicaTypeWorker]; ok {
result = append(result, kftraining.PyTorchJobReplicaTypeWorker)
}
return result
}

// PriorityClass calculates the priorityClass name needed for workload according to the following priorities:
Expand All @@ -91,42 +103,21 @@ func (j *PyTorchJob) Object() client.Object {
//
// This function is inspired by an analogous one in mpi-controller:
// https://github.com/kubeflow/mpi-operator/blob/5946ef4157599a474ab82ff80e780d5c2546c9ee/pkg/controller/podgroup.go#L69-L72
func (j *PyTorchJob) PriorityClass() string {
pytorchJob := j.Object().(*kftraining.PyTorchJob)

if pytorchJob.Spec.RunPolicy.SchedulingPolicy != nil && len(pytorchJob.Spec.RunPolicy.SchedulingPolicy.PriorityClass) != 0 {
return pytorchJob.Spec.RunPolicy.SchedulingPolicy.PriorityClass
} else if m := pytorchJob.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeMaster]; m != nil && len(m.Template.Spec.PriorityClassName) != 0 {
func (j *JobControl) PriorityClass() string {
if j.Spec.RunPolicy.SchedulingPolicy != nil && len(j.Spec.RunPolicy.SchedulingPolicy.PriorityClass) != 0 {
return j.Spec.RunPolicy.SchedulingPolicy.PriorityClass
} else if m := j.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeMaster]; m != nil && len(m.Template.Spec.PriorityClassName) != 0 {
return m.Template.Spec.PriorityClassName
} else if w := pytorchJob.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeWorker]; w != nil && len(w.Template.Spec.PriorityClassName) != 0 {
} else if w := j.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeWorker]; w != nil && len(w.Template.Spec.PriorityClassName) != 0 {
return w.Template.Spec.PriorityClassName
}
return ""
}

type JobControl kftraining.PyTorchJob

var _ kubeflowjob.KFJobControl = (*JobControl)(nil)

func (c *JobControl) JobStatus() kftraining.JobStatus {
return c.Status
}

func (c *JobControl) ReplicaSpecs() map[kftraining.ReplicaType]*kftraining.ReplicaSpec {
return c.Spec.PyTorchReplicaSpecs
}

func (c *JobControl) RunPolicy() *kftraining.RunPolicy {
return &c.Spec.RunPolicy
func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error {
return jobframework.SetupWorkloadOwnerIndex(ctx, indexer, gvk)
}

func (c *JobControl) OrderedReplicaTypes(replicaSpecs map[kftraining.ReplicaType]*kftraining.ReplicaSpec) []kftraining.ReplicaType {
result := make([]kftraining.ReplicaType, 0, 2)
if _, ok := replicaSpecs[kftraining.PyTorchJobReplicaTypeMaster]; ok {
result = append(result, kftraining.PyTorchJobReplicaTypeMaster)
}
if _, ok := replicaSpecs[kftraining.PyTorchJobReplicaTypeWorker]; ok {
result = append(result, kftraining.PyTorchJobReplicaTypeWorker)
}
return result
func GetWorkloadNameForPyTorchJob(jobName string) string {
return jobframework.GetWorkloadNameForOwnerWithGVK(jobName, gvk)
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (w *PyTorchJobWebhook) Default(ctx context.Context, obj runtime.Object) err
return nil
}

// +kubebuilder:webhook:path=/validate-kubeflow-org-v1-pytorchjob,mutating=false,failurePolicy=fail,sideEffects=None,groups=kubeflow.org,resources=pytorchjobs,verbs=update,versions=v1,name=vpytorchjob.kb.io,admissionReviewVersions=v1
// +kubebuilder:webhook:path=/validate-kubeflow-org-v1-pytorchjob,mutating=false,failurePolicy=fail,sideEffects=None,groups=kubeflow.org,resources=pytorchjobs,verbs=create;update,versions=v1,name=vpytorchjob.kb.io,admissionReviewVersions=v1

var _ webhook.CustomValidator = &PyTorchJobWebhook{}

Expand Down
12 changes: 12 additions & 0 deletions pkg/controller/jobs/kubeflow/kubeflowjob/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,23 @@ package kubeflowjob

import (
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type KFJobControl interface {
// Object returns the KFJob interface.
Object() client.Object
// GetGVK returns the GroupVersionKind for the KFJob.
GetGVK() schema.GroupVersionKind
// RunPolicy returns the RunPolicy for the KFJob.
RunPolicy() *kftraining.RunPolicy
// ReplicaSpecs returns the ReplicaSpecs for the KFJob.
ReplicaSpecs() map[kftraining.ReplicaType]*kftraining.ReplicaSpec
// JobStatus returns the JobStatus for the KFJob.
JobStatus() kftraining.JobStatus
// OrderedReplicaTypes returns the ordered list of ReplicaTypes for the KFJob.
OrderedReplicaTypes(replicaSpecs map[kftraining.ReplicaType]*kftraining.ReplicaSpec) []kftraining.ReplicaType
// PriorityClass returns the KFJob's priority class name.
PriorityClass() string
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,58 +33,60 @@ import (
)

type KubeflowJob struct {
KFJobControl
}

func NewKubeflowJob(ctrl KFJobControl) *KubeflowJob {
return &KubeflowJob{KFJobControl: ctrl}
KFJobControl KFJobControl
}

var _ jobframework.GenericJob = (*KubeflowJob)(nil)
var _ jobframework.JobWithPriorityClass = (*KubeflowJob)(nil)

func (j *KubeflowJob) Object() client.Object {
return nil
return j.KFJobControl.Object()
}

func (j *KubeflowJob) IsSuspended() bool {
return j.RunPolicy().Suspend != nil && *j.RunPolicy().Suspend
return j.KFJobControl.RunPolicy().Suspend != nil && *j.KFJobControl.RunPolicy().Suspend
}

func (j *KubeflowJob) Suspend() {
j.RunPolicy().Suspend = pointer.Bool(true)
j.KFJobControl.RunPolicy().Suspend = pointer.Bool(true)
}

func (j *KubeflowJob) RunWithPodSetsInfo(podSetInfos []jobframework.PodSetInfo) {
j.RunPolicy().Suspend = pointer.Bool(false)
if len(podSetInfos) == 0 {
return
func (j *KubeflowJob) RunWithPodSetsInfo(podSetInfos []jobframework.PodSetInfo) error {
j.KFJobControl.RunPolicy().Suspend = pointer.Bool(false)
orderedReplicaTypes := j.KFJobControl.OrderedReplicaTypes(j.KFJobControl.ReplicaSpecs())

if len(podSetInfos) != len(orderedReplicaTypes) {
return jobframework.BadPodSetsInfoLenError(len(orderedReplicaTypes), len(podSetInfos))
}
// The node selectors are provided in the same order as the generated list of
// podSets, use the same ordering logic to restore them.
orderedReplicaTypes := j.OrderedReplicaTypes(j.ReplicaSpecs())
for index := range podSetInfos {
replicaType := orderedReplicaTypes[index]
info := podSetInfos[index]
replicaSpec := &j.ReplicaSpecs()[replicaType].Template.Spec
replicaSpec := &j.KFJobControl.ReplicaSpecs()[replicaType].Template.Spec
replicaSpec.NodeSelector = maps.MergeKeepFirst(info.NodeSelector, replicaSpec.NodeSelector)
}
return nil
}

func (j *KubeflowJob) RestorePodSetsInfo(podSetInfos []jobframework.PodSetInfo) {
orderedReplicaTypes := j.OrderedReplicaTypes(j.ReplicaSpecs())
func (j *KubeflowJob) RestorePodSetsInfo(podSetInfos []jobframework.PodSetInfo) bool {
orderedReplicaTypes := j.KFJobControl.OrderedReplicaTypes(j.KFJobControl.ReplicaSpecs())
changed := false
for index, info := range podSetInfos {
replicaType := orderedReplicaTypes[index]
replicaSpec := &j.ReplicaSpecs()[replicaType].Template.Spec
replicaSpec := &j.KFJobControl.ReplicaSpecs()[replicaType].Template.Spec
if !equality.Semantic.DeepEqual(replicaSpec.NodeSelector, info.NodeSelector) {
changed = true
replicaSpec.NodeSelector = maps.Clone(info.NodeSelector)
}
}
return changed
}

func (j *KubeflowJob) Finished() (metav1.Condition, bool) {
var conditionType kftraining.JobConditionType
var finished bool
for _, c := range j.JobStatus().Conditions {
for _, c := range j.KFJobControl.JobStatus().Conditions {
if (c.Type == kftraining.JobSucceeded || c.Type == kftraining.JobFailed) && c.Status == corev1.ConditionTrue {
conditionType = c.Type
finished = true
Expand All @@ -105,20 +107,20 @@ func (j *KubeflowJob) Finished() (metav1.Condition, bool) {
}

func (j *KubeflowJob) PodSets() []kueue.PodSet {
replicaTypes := j.OrderedReplicaTypes(j.ReplicaSpecs())
replicaTypes := j.KFJobControl.OrderedReplicaTypes(j.KFJobControl.ReplicaSpecs())
podSets := make([]kueue.PodSet, len(replicaTypes))
for index, replicaType := range replicaTypes {
podSets[index] = kueue.PodSet{
Name: strings.ToLower(string(replicaType)),
Template: *j.ReplicaSpecs()[replicaType].Template.DeepCopy(),
Count: podsCount(j.ReplicaSpecs(), replicaType),
Template: *j.KFJobControl.ReplicaSpecs()[replicaType].Template.DeepCopy(),
Count: podsCount(j.KFJobControl.ReplicaSpecs(), replicaType),
}
}
return podSets
}

func (j *KubeflowJob) IsActive() bool {
for _, replicaStatus := range j.JobStatus().ReplicaStatuses {
for _, replicaStatus := range j.KFJobControl.JobStatus().ReplicaStatuses {
if replicaStatus.Active != 0 {
return true
}
Expand All @@ -127,7 +129,7 @@ func (j *KubeflowJob) IsActive() bool {
}

func (j *KubeflowJob) PodsReady() bool {
for _, c := range j.JobStatus().Conditions {
for _, c := range j.KFJobControl.JobStatus().Conditions {
if c.Type == kftraining.JobRunning && c.Status == corev1.ConditionTrue {
return true
}
Expand All @@ -136,7 +138,11 @@ func (j *KubeflowJob) PodsReady() bool {
}

func (j *KubeflowJob) GetGVK() schema.GroupVersionKind {
return schema.GroupVersionKind{}
return j.KFJobControl.GetGVK()
}

func (j *KubeflowJob) PriorityClass() string {
return j.KFJobControl.PriorityClass()
}

func podsCount(replicaSpecs map[kftraining.ReplicaType]*kftraining.ReplicaSpec, replicaType kftraining.ReplicaType) int32 {
Expand Down

0 comments on commit fc23a88

Please sign in to comment.