Skip to content

Commit

Permalink
Support kubeflow.org/pytorchjob
Browse files Browse the repository at this point in the history
Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
  • Loading branch information
tenzen-y committed Jul 18, 2023
1 parent 3e9b6e3 commit 61b2e25
Show file tree
Hide file tree
Showing 21 changed files with 1,616 additions and 2 deletions.
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ test: generate gotestsum ## Run tests.
$(GOTESTSUM) --junitfile $(ARTIFACTS)/junit.xml -- $(GO_TEST_FLAGS) $(shell $(GO_CMD) list ./... | grep -v '/test/') -coverprofile $(ARTIFACTS)/cover.out

.PHONY: test-integration
test-integration: manifests generate envtest ginkgo mpi-operator-crd ray-operator-crd jobset-operator-crd ## Run tests.
test-integration: manifests generate envtest ginkgo mpi-operator-crd ray-operator-crd jobset-operator-crd training-operator-crd ## Run tests.
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" \
$(GINKGO) $(GINKGO_ARGS) --junit-report=junit.xml --output-dir=$(ARTIFACTS) -v $(INTEGRATION_TARGET)

Expand Down Expand Up @@ -320,6 +320,12 @@ mpi-operator-crd:
mkdir -p $(PROJECT_DIR)/dep-crds/mpi-operator/
cp -f $(MPIROOT)/manifests/base/* $(PROJECT_DIR)/dep-crds/mpi-operator/

TRAININGROOT = $(shell $(GO_CMD) list -m -f "{{.Dir}}" github.com/kubeflow/training-operator)
.PHONY: training-operator-crd
training-operator-crd:
mkdir -p $(PROJECT_DIR)/dep-crds/training-operator/
cp -f $(TRAININGROOT)/manifests/base/crds/* $(PROJECT_DIR)/dep-crds/training-operator/

RAYROOT = $(shell $(GO_CMD) list -m -f "{{.Dir}}" github.com/ray-project/kuberay/ray-operator)
.PHONY: ray-operator-crd
ray-operator-crd:
Expand Down
1 change: 1 addition & 0 deletions apis/config/v1beta1/configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,5 +209,6 @@ type Integrations struct {
// - "kubeflow.org/mpijob"
// - "ray.io/rayjob"
// - "jobset.x-k8s.io/jobset"
// - "kubeflow.org/pytorchjob"
Frameworks []string `json:"frameworks,omitempty"`
}
17 changes: 17 additions & 0 deletions charts/kueue/templates/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,23 @@ rules:
verbs:
- get
- update
- apiGroups:
- kubeflow.org
resources:
- pytorchjobs
verbs:
- get
- list
- patch
- update
- watch
- apiGroups:
- kubeflow.org
resources:
- pytorchjobs/status
verbs:
- get
- update
- apiGroups:
- kueue.x-k8s.io
resources:
Expand Down
1 change: 1 addition & 0 deletions config/components/manager/controller_manager_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ integrations:
- "kubeflow.org/mpijob"
- "ray.io/rayjob"
- "jobset.x-k8s.io/jobset"
- "kubeflow.org/pytorchjob"
17 changes: 17 additions & 0 deletions config/components/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,23 @@ rules:
verbs:
- get
- update
- apiGroups:
- kubeflow.org
resources:
- pytorchjobs
verbs:
- get
- list
- patch
- update
- watch
- apiGroups:
- kubeflow.org
resources:
- pytorchjobs/status
verbs:
- get
- update
- apiGroups:
- kueue.x-k8s.io
resources:
Expand Down
38 changes: 38 additions & 0 deletions config/components/webhook/manifests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,25 @@ webhooks:
resources:
- jobsets
sideEffects: None
- admissionReviewVersions:
- v1
clientConfig:
service:
name: webhook-service
namespace: system
path: /mutate-kubeflow-org-v1-pytorchjob
failurePolicy: Fail
name: mpytorchjob.kb.io
rules:
- apiGroups:
- kubeflow.org
apiVersions:
- v1
operations:
- CREATE
resources:
- pytorchjobs
sideEffects: None
- admissionReviewVersions:
- v1
clientConfig:
Expand Down Expand Up @@ -182,6 +201,25 @@ webhooks:
resources:
- jobsets
sideEffects: None
- admissionReviewVersions:
- v1
clientConfig:
service:
name: webhook-service
namespace: system
path: /validate-kubeflow-org-v1-pytorchjob
failurePolicy: Fail
name: vpytorchjob.kb.io
rules:
- apiGroups:
- kubeflow.org
apiVersions:
- v1
operations:
- UPDATE
resources:
- pytorchjobs
sideEffects: None
- admissionReviewVersions:
- v1
clientConfig:
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/google/go-cmp v0.5.9
github.com/kubeflow/common v0.4.7
github.com/kubeflow/mpi-operator v0.4.0
github.com/kubeflow/training-operator v1.6.0
github.com/onsi/ginkgo/v2 v2.11.0
github.com/onsi/gomega v1.27.8
github.com/open-policy-agent/cert-controller v0.7.1-0.20230527042005-3b09cd39622f
Expand Down Expand Up @@ -61,6 +62,7 @@ require (
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand All @@ -83,3 +85,5 @@ require (
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)

replace github.com/kubeflow/training-operator v1.6.0 => github.com/tenzen-y/training-operator v1.3.0-rc.1.0.20230717233919-1ed3e8e55322
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ github.com/ray-project/kuberay/ray-operator v0.0.0-20230613204710-aeed3cdcbdcc h
github.com/ray-project/kuberay/ray-operator v0.0.0-20230613204710-aeed3cdcbdcc/go.mod h1:2auArgwD9dXXJz1oc7SqQ4U/rHdpwnrBwG98kr8OWXA=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
Expand All @@ -167,6 +169,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/tenzen-y/training-operator v1.3.0-rc.1.0.20230717233919-1ed3e8e55322 h1:vnhvmZXmAzNHMDg/sgN3HY84R7YBC82HhfAn1ElvswM=
github.com/tenzen-y/training-operator v1.3.0-rc.1.0.20230717233919-1ed3e8e55322/go.mod h1:aZzjgAj3+X1go6pnl3SMUJECWLX4w/bjNnPb9Nb3ns0=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
Expand Down Expand Up @@ -232,6 +236,7 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s=
golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
2 changes: 1 addition & 1 deletion main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ integrations:
{
name: "bad integrations config",
configFile: badIntegrationsConfig,
wantError: fmt.Errorf("integrations.frameworks: Unsupported value: \"unregistered/jobframework\": supported values: \"batch/job\", \"jobset.x-k8s.io/jobset\", \"kubeflow.org/mpijob\", \"ray.io/rayjob\""),
wantError: fmt.Errorf("integrations.frameworks: Unsupported value: \"unregistered/jobframework\": supported values: \"batch/job\", \"jobset.x-k8s.io/jobset\", \"kubeflow.org/mpijob\", \"kubeflow.org/pytorchjob\", \"ray.io/rayjob\""),
},
}

Expand Down
1 change: 1 addition & 0 deletions pkg/controller/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package jobs
import (
_ "sigs.k8s.io/kueue/pkg/controller/jobs/job"
_ "sigs.k8s.io/kueue/pkg/controller/jobs/jobset"
_ "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs"
_ "sigs.k8s.io/kueue/pkg/controller/jobs/mpijob"
_ "sigs.k8s.io/kueue/pkg/controller/jobs/rayjob"
)
6 changes: 6 additions & 0 deletions pkg/controller/jobs/kubeflow/jobs/jobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package jobs

// Reference the job framework integration packages to ensure linking.
import (
_ "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/pytorchjob"
)
116 changes: 116 additions & 0 deletions pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorchjob_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package pytorchjob

import (
"context"

kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/kubeflowjob"
)

var (
gvk = kftraining.SchemeGroupVersion.WithKind(kftraining.PytorchJobKind)
FrameworkName = "kubeflow.org/pytorchjob"
)

func init() {
utilruntime.Must(jobframework.RegisterIntegration(FrameworkName, jobframework.IntegrationCallbacks{
SetupIndexes: SetupIndexes,
NewReconciler: NewReconciler,
SetupWebhook: SetupPyTorchJobWebhook,
JobType: &kftraining.PyTorchJob{},
AddToScheme: kftraining.AddToScheme,
}))
}

// +kubebuilder:rbac:groups=scheduling.k8s.io,resources=priorityclasses,verbs=list;get;watch
// +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch
// +kubebuilder:rbac:groups=kubeflow.org,resources=pytorchjobs,verbs=get;list;watch;update;patch
// +kubebuilder:rbac:groups=kubeflow.org,resources=pytorchjobs/status,verbs=get;update
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/finalizers,verbs=update
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=resourceflavors,verbs=get;list;watch

var NewReconciler = jobframework.NewGenericReconciler(func() jobframework.GenericJob {
pytorchJobObj := &kftraining.PyTorchJob{}
return &PyTorchJob{PyTorchJob: pytorchJobObj, KubeflowJob: kubeflowjob.NewKubeflowJob((*JobControl)(pytorchJobObj))}
}, nil)

func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error {
return jobframework.SetupWorkloadOwnerIndex(ctx, indexer, gvk)
}

func GetWorkloadNameForPyTorchJob(jobName string) string {
return jobframework.GetWorkloadNameForOwnerWithGVK(jobName, gvk)
}

type PyTorchJob struct {
*kftraining.PyTorchJob
*kubeflowjob.KubeflowJob
}

var _ jobframework.GenericJob = (*PyTorchJob)(nil)

func fromObject(o runtime.Object) *PyTorchJob {
pytorchJobObj := o.(*kftraining.PyTorchJob)
return &PyTorchJob{PyTorchJob: pytorchJobObj, KubeflowJob: kubeflowjob.NewKubeflowJob((*JobControl)(pytorchJobObj))}
}

func (j *PyTorchJob) GetGVK() schema.GroupVersionKind {
return gvk
}

func (j *PyTorchJob) Object() client.Object {
return j.PyTorchJob
}

// PriorityClass calculates the priorityClass name needed for workload according to the following priorities:
// 1. .spec.runPolicy.schedulingPolicy.priorityClass
// 2. .spec.replicaSpecs[Master].template.spec.priorityClassName
// 3. .spec.replicaSpecs[Worker].template.spec.priorityClassName
//
// This function is inspired by an analogous one in mpi-controller:
// https://github.com/kubeflow/mpi-operator/blob/5946ef4157599a474ab82ff80e780d5c2546c9ee/pkg/controller/podgroup.go#L69-L72
func (j *PyTorchJob) PriorityClass() string {
if j.Spec.RunPolicy.SchedulingPolicy != nil && len(j.Spec.RunPolicy.SchedulingPolicy.PriorityClass) != 0 {
return j.Spec.RunPolicy.SchedulingPolicy.PriorityClass
} else if l := j.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeMaster]; l != nil && len(l.Template.Spec.PriorityClassName) != 0 {
return l.Template.Spec.PriorityClassName
} else if w := j.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeWorker]; w != nil && len(w.Template.Spec.PriorityClassName) != 0 {
return w.Template.Spec.PriorityClassName
}
return ""
}

type JobControl kftraining.PyTorchJob

var _ kubeflowjob.KFJobControl = (*JobControl)(nil)

func (c *JobControl) JobStatus() kftraining.JobStatus {
return c.Status
}

func (c *JobControl) ReplicaSpecs() map[kftraining.ReplicaType]*kftraining.ReplicaSpec {
return c.Spec.PyTorchReplicaSpecs
}

func (c *JobControl) RunPolicy() *kftraining.RunPolicy {
return &c.Spec.RunPolicy
}

func (c *JobControl) OrderedReplicaTypes(replicaSpecs map[kftraining.ReplicaType]*kftraining.ReplicaSpec) []kftraining.ReplicaType {
var result []kftraining.ReplicaType
if _, ok := replicaSpecs[kftraining.PyTorchJobReplicaTypeMaster]; ok {
result = append(result, kftraining.PyTorchJobReplicaTypeMaster)
}
if _, ok := replicaSpecs[kftraining.PyTorchJobReplicaTypeWorker]; ok {
result = append(result, kftraining.PyTorchJobReplicaTypeWorker)
}
return result
}
Loading

0 comments on commit 61b2e25

Please sign in to comment.