generated from kubernetes/kubernetes-template-project
-
Notifications
You must be signed in to change notification settings - Fork 300
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
- Loading branch information
Showing
22 changed files
with
1,628 additions
and
14 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,3 +32,4 @@ integrations: | |
- "kubeflow.org/mpijob" | ||
- "ray.io/rayjob" | ||
- "jobset.x-k8s.io/jobset" | ||
- "kubeflow.org/pytorchjob" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
package jobs | ||
|
||
// Reference the job framework integration packages to ensure linking. | ||
import ( | ||
_ "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/pytorchjob" | ||
) |
116 changes: 116 additions & 0 deletions
116
pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorchjob_controller.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
package pytorchjob | ||
|
||
import ( | ||
"context" | ||
|
||
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
"k8s.io/apimachinery/pkg/runtime/schema" | ||
utilruntime "k8s.io/apimachinery/pkg/util/runtime" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
|
||
"sigs.k8s.io/kueue/pkg/controller/jobframework" | ||
"sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/kubeflowjob" | ||
) | ||
|
||
var ( | ||
gvk = kftraining.SchemeGroupVersion.WithKind(kftraining.PytorchJobKind) | ||
FrameworkName = "kftraining.org/pytorchjob" | ||
) | ||
|
||
func init() { | ||
utilruntime.Must(jobframework.RegisterIntegration(FrameworkName, jobframework.IntegrationCallbacks{ | ||
SetupIndexes: SetupIndexes, | ||
NewReconciler: NewReconciler, | ||
SetupWebhook: SetupPyTorchJobWebhook, | ||
JobType: &kftraining.PyTorchJob{}, | ||
AddToScheme: kftraining.AddToScheme, | ||
})) | ||
} | ||
|
||
// +kubebuilder:rbac:groups=scheduling.k8s.io,resources=priorityclasses,verbs=list;get;watch | ||
// +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch | ||
// +kubebuilder:rbac:groups=kubeflow.org,resources=pytorchjobs,verbs=get;list;watch;update;patch | ||
// +kubebuilder:rbac:groups=kubeflow.org,resources=pytorchjobs/status,verbs=get;update | ||
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads,verbs=get;list;watch;create;update;patch;delete | ||
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/status,verbs=get;update;patch | ||
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/finalizers,verbs=update | ||
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=resourceflavors,verbs=get;list;watch | ||
|
||
var NewReconciler = jobframework.NewGenericReconciler(func() jobframework.GenericJob { | ||
pytorchJobObj := &kftraining.PyTorchJob{} | ||
return &PyTorchJob{PyTorchJob: pytorchJobObj, KubeflowJob: kubeflowjob.NewKubeflowJob((*JobControl)(pytorchJobObj))} | ||
}, nil) | ||
|
||
func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error { | ||
return jobframework.SetupWorkloadOwnerIndex(ctx, indexer, gvk) | ||
} | ||
|
||
func GetWorkloadNameForPyTorchJob(jobName string) string { | ||
return jobframework.GetWorkloadNameForOwnerWithGVK(jobName, gvk) | ||
} | ||
|
||
type PyTorchJob struct { | ||
*kftraining.PyTorchJob | ||
*kubeflowjob.KubeflowJob | ||
} | ||
|
||
var _ jobframework.GenericJob = (*PyTorchJob)(nil) | ||
|
||
func fromObject(o runtime.Object) *PyTorchJob { | ||
pytorchJobObj := o.(*kftraining.PyTorchJob) | ||
return &PyTorchJob{PyTorchJob: pytorchJobObj, KubeflowJob: kubeflowjob.NewKubeflowJob((*JobControl)(pytorchJobObj))} | ||
} | ||
|
||
func (j *PyTorchJob) GetGVK() schema.GroupVersionKind { | ||
return gvk | ||
} | ||
|
||
func (j *PyTorchJob) Object() client.Object { | ||
return j.PyTorchJob | ||
} | ||
|
||
// PriorityClass calculates the priorityClass name needed for workload according to the following priorities: | ||
// 1. .spec.runPolicy.schedulingPolicy.priorityClass | ||
// 2. .spec.replicaSpecs[Master].template.spec.priorityClassName | ||
// 3. .spec.replicaSpecs[Worker].template.spec.priorityClassName | ||
// | ||
// This function is inspired by an analogous one in mpi-controller: | ||
// https://github.com/kubeflow/mpi-operator/blob/5946ef4157599a474ab82ff80e780d5c2546c9ee/pkg/controller/podgroup.go#L69-L72 | ||
func (j *PyTorchJob) PriorityClass() string { | ||
if j.Spec.RunPolicy.SchedulingPolicy != nil && len(j.Spec.RunPolicy.SchedulingPolicy.PriorityClass) != 0 { | ||
return j.Spec.RunPolicy.SchedulingPolicy.PriorityClass | ||
} else if l := j.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeMaster]; l != nil && len(l.Template.Spec.PriorityClassName) != 0 { | ||
return l.Template.Spec.PriorityClassName | ||
} else if w := j.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeWorker]; w != nil && len(w.Template.Spec.PriorityClassName) != 0 { | ||
return w.Template.Spec.PriorityClassName | ||
} | ||
return "" | ||
} | ||
|
||
type JobControl kftraining.PyTorchJob | ||
|
||
var _ kubeflowjob.KFJobControl = (*JobControl)(nil) | ||
|
||
func (c *JobControl) JobStatus() kftraining.JobStatus { | ||
return c.Status | ||
} | ||
|
||
func (c *JobControl) ReplicaSpecs() map[kftraining.ReplicaType]*kftraining.ReplicaSpec { | ||
return c.Spec.PyTorchReplicaSpecs | ||
} | ||
|
||
func (c *JobControl) RunPolicy() *kftraining.RunPolicy { | ||
return &c.Spec.RunPolicy | ||
} | ||
|
||
func (c *JobControl) OrderedReplicaTypes(replicaSpecs map[kftraining.ReplicaType]*kftraining.ReplicaSpec) []kftraining.ReplicaType { | ||
var result []kftraining.ReplicaType | ||
if _, ok := replicaSpecs[kftraining.PyTorchJobReplicaTypeMaster]; ok { | ||
result = append(result, kftraining.PyTorchJobReplicaTypeMaster) | ||
} | ||
if _, ok := replicaSpecs[kftraining.PyTorchJobReplicaTypeWorker]; ok { | ||
result = append(result, kftraining.PyTorchJobReplicaTypeWorker) | ||
} | ||
return result | ||
} |
Oops, something went wrong.