Skip to content

Commit

Permalink
Adapt new integration test framework
Browse files Browse the repository at this point in the history
Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
  • Loading branch information
tenzen-y committed Aug 14, 2023
1 parent 37201a3 commit 86c5b32
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
Expand All @@ -48,11 +48,11 @@ func (j *KubeflowJob) IsSuspended() bool {
}

func (j *KubeflowJob) Suspend() {
j.KFJobControl.RunPolicy().Suspend = pointer.Bool(true)
j.KFJobControl.RunPolicy().Suspend = ptr.To(true)
}

func (j *KubeflowJob) RunWithPodSetsInfo(podSetInfos []jobframework.PodSetInfo) error {
j.KFJobControl.RunPolicy().Suspend = pointer.Bool(false)
j.KFJobControl.RunPolicy().Suspend = ptr.To(false)
orderedReplicaTypes := j.KFJobControl.OrderedReplicaTypes(j.KFJobControl.ReplicaSpecs())

if len(podSetInfos) != len(orderedReplicaTypes) {
Expand Down Expand Up @@ -146,5 +146,5 @@ func (j *KubeflowJob) PriorityClass() string {
}

func podsCount(replicaSpecs map[kftraining.ReplicaType]*kftraining.ReplicaSpec, replicaType kftraining.ReplicaType) int32 {
return pointer.Int32Deref(replicaSpecs[replicaType].Replicas, 1)
return ptr.Deref(replicaSpecs[replicaType].Replicas, 1)
}
10 changes: 5 additions & 5 deletions pkg/util/testingjobs/pytorchjob/wrappers_pytorchjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"

"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/util/pointer"
)

// PyTorchJobWrapper wraps a Job.
Expand All @@ -40,11 +40,11 @@ func MakePyTorchJob(name, ns string) *PyTorchJobWrapper {
},
Spec: kftraining.PyTorchJobSpec{
RunPolicy: kftraining.RunPolicy{
Suspend: pointer.Bool(true),
Suspend: ptr.To(true),
},
PyTorchReplicaSpecs: map[kftraining.ReplicaType]*kftraining.ReplicaSpec{
kftraining.PyTorchJobReplicaTypeMaster: {
Replicas: pointer.Int32(1),
Replicas: ptr.To[int32](1),
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: "Never",
Expand All @@ -61,7 +61,7 @@ func MakePyTorchJob(name, ns string) *PyTorchJobWrapper {
},
},
kftraining.PyTorchJobReplicaTypeWorker: {
Replicas: pointer.Int32(1),
Replicas: ptr.To[int32](1),
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: "Never",
Expand Down Expand Up @@ -113,7 +113,7 @@ func (j *PyTorchJobWrapper) Request(replicaType kftraining.ReplicaType, r corev1

// Parallelism updates job parallelism.
func (j *PyTorchJobWrapper) Parallelism(p int32) *PyTorchJobWrapper {
j.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeWorker].Replicas = pointer.Int32(p)
j.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeWorker].Replicas = ptr.To(p)
return j
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
Expand Down Expand Up @@ -59,12 +59,11 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu

ginkgo.BeforeAll(func() {
fwk = &framework.Framework{
ManagerSetup: managerSetup(jobframework.WithManageJobsWithoutQueueName(true)),
CRDPath: crdPath,
DepCRDPaths: []string{pytorchCrdPath},
CRDPath: crdPath,
DepCRDPaths: []string{pytorchCrdPath},
}

ctx, cfg, k8sClient = fwk.Setup()
cfg = fwk.Init()
ctx, k8sClient = fwk.RunManager(cfg, managerSetup(jobframework.WithManageJobsWithoutQueueName(true)))
})
ginkgo.AfterAll(func() {
fwk.Teardown()
Expand Down Expand Up @@ -168,14 +167,14 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "on-demand",
},
Count: pointer.Int32(createdWorkload.Spec.PodSets[0].Count),
Count: ptr.To(createdWorkload.Spec.PodSets[0].Count),
},
kueue.PodSetAssignment{
Name: "Worker",
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "spot",
},
Count: pointer.Int32(createdWorkload.Spec.PodSets[1].Count),
Count: ptr.To(createdWorkload.Spec.PodSets[1].Count),
},
).
Obj()
Expand Down Expand Up @@ -203,7 +202,7 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu
}, util.ConsistentDuration, util.Interval).Should(gomega.BeTrue())

ginkgo.By("checking the job gets suspended when parallelism changes and the added node selectors are removed")
parallelism := pointer.Int32Deref(job.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeWorker].Replicas, 1)
parallelism := ptr.Deref(job.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeWorker].Replicas, 1)
newParallelism := parallelism + 1
createdJob.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeWorker].Replicas = &newParallelism
gomega.Expect(k8sClient.Update(ctx, createdJob)).Should(gomega.Succeed())
Expand Down Expand Up @@ -236,14 +235,14 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "on-demand",
},
Count: pointer.Int32(createdWorkload.Spec.PodSets[0].Count),
Count: ptr.To(createdWorkload.Spec.PodSets[0].Count),
},
kueue.PodSetAssignment{
Name: "Worker",
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "spot",
},
Count: pointer.Int32(createdWorkload.Spec.PodSets[1].Count),
Count: ptr.To(createdWorkload.Spec.PodSets[1].Count),
},
).
Obj()
Expand Down Expand Up @@ -287,11 +286,11 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu
var _ = ginkgo.Describe("Job controller for workloads when only jobs with queue are managed", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
ginkgo.BeforeAll(func() {
fwk = &framework.Framework{
ManagerSetup: managerSetup(),
CRDPath: crdPath,
DepCRDPaths: []string{pytorchCrdPath},
CRDPath: crdPath,
DepCRDPaths: []string{pytorchCrdPath},
}
ctx, cfg, k8sClient = fwk.Setup()
cfg := fwk.Init()
ctx, k8sClient = fwk.RunManager(cfg, managerSetup())
})
ginkgo.AfterAll(func() {
fwk.Teardown()
Expand Down Expand Up @@ -353,11 +352,11 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", ginkgo.O

ginkgo.BeforeAll(func() {
fwk = &framework.Framework{
ManagerSetup: managerSetup(jobframework.WithWaitForPodsReady(true)),
CRDPath: crdPath,
DepCRDPaths: []string{pytorchCrdPath},
CRDPath: crdPath,
DepCRDPaths: []string{pytorchCrdPath},
}
ctx, cfg, k8sClient = fwk.Setup()
cfg := fwk.Init()
ctx, k8sClient = fwk.RunManager(cfg, managerSetup(jobframework.WithWaitForPodsReady(true)))

ginkgo.By("Create a resource flavor")
gomega.Expect(k8sClient.Create(ctx, defaultFlavor)).Should(gomega.Succeed())
Expand Down Expand Up @@ -405,14 +404,14 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", ginkgo.O
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "default",
},
Count: pointer.Int32(createdWorkload.Spec.PodSets[0].Count),
Count: ptr.To(createdWorkload.Spec.PodSets[0].Count),
},
kueue.PodSetAssignment{
Name: "Worker",
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "default",
},
Count: pointer.Int32(createdWorkload.Spec.PodSets[1].Count),
Count: ptr.To(createdWorkload.Spec.PodSets[1].Count),
},
).
Obj()
Expand All @@ -423,7 +422,7 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", ginkgo.O
gomega.Eventually(func() *bool {
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.RunPolicy.Suspend
}, util.Timeout, util.Interval).Should(gomega.Equal(pointer.Bool(false)))
}, util.Timeout, util.Interval).Should(gomega.Equal(ptr.To(false)))

if podsReadyTestSpec.beforeJobStatus != nil {
ginkgo.By("Update the job status to simulate its initial progress towards completion")
Expand Down Expand Up @@ -562,11 +561,11 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde

ginkgo.BeforeAll(func() {
fwk = &framework.Framework{
ManagerSetup: managerAndSchedulerSetup(),
CRDPath: crdPath,
DepCRDPaths: []string{pytorchCrdPath},
CRDPath: crdPath,
DepCRDPaths: []string{pytorchCrdPath},
}
ctx, cfg, k8sClient = fwk.Setup()
cfg := fwk.Init()
ctx, k8sClient = fwk.RunManager(cfg, managerSetup())
})
ginkgo.AfterAll(func() {
fwk.Teardown()
Expand Down Expand Up @@ -616,7 +615,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: job.Name, Namespace: job.Namespace}, createdJob)).
Should(gomega.Succeed())
return createdJob.Spec.RunPolicy.Suspend
}, util.Timeout, util.Interval).Should(gomega.Equal(pointer.Bool(false)))
}, util.Timeout, util.Interval).Should(gomega.Equal(ptr.To(false)))
gomega.Expect(createdJob.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeMaster].Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotUntaintedFlavor.Name))
gomega.Expect(createdJob.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeWorker].Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name))
util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 0)
Expand Down Expand Up @@ -650,7 +649,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde
gomega.Eventually(func() *bool {
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.RunPolicy.Suspend
}, util.Timeout, util.Interval).Should(gomega.Equal(pointer.Bool(true)))
}, util.Timeout, util.Interval).Should(gomega.Equal(ptr.To(true)))
})

// backup the node selectors
Expand All @@ -664,7 +663,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde
gomega.Eventually(func() *bool {
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.RunPolicy.Suspend
}, util.Timeout, util.Interval).Should(gomega.Equal(pointer.Bool(false)))
}, util.Timeout, util.Interval).Should(gomega.Equal(ptr.To(false)))
})

ginkgo.By("the node selectors should be updated", func() {
Expand Down

0 comments on commit 86c5b32

Please sign in to comment.