diff --git a/pkg/common/util/v1/testutil/const.go b/pkg/common/util/v1/testutil/const.go deleted file mode 100644 index 1260bb4710..0000000000 --- a/pkg/common/util/v1/testutil/const.go +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright 2018 The Kubeflow Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package testutil - -import ( - "time" -) - -const ( - TestImageName = "test-image-for-kubeflow-training-operator:latest" - TestTFJobName = "test-tfjob" - LabelWorker = "worker" - LabelPS = "ps" - LabelChief = "chief" - TFJobKind = "TFJob" - - SleepInterval = 500 * time.Millisecond - ThreadCount = 1 -) - -var ( - AlwaysReady = func() bool { return true } -) diff --git a/pkg/common/util/v1/testutil/util.go b/pkg/common/util/v1/testutil/util.go deleted file mode 100644 index 5a6befaf26..0000000000 --- a/pkg/common/util/v1/testutil/util.go +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright 2018 The Kubeflow Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package testutil - -import ( - "testing" - - commonv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/cache" - - kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" -) - -const ( - LabelGroupName = "group-name" - JobNameLabel = "job-name" -) - -var ( - // KeyFunc is the short name to DeletionHandlingMetaNamespaceKeyFunc. - // IndexerInformer uses a delta queue, therefore for deletes we have to use this - // key function but it should be just fine for non delete events. - KeyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc - GroupName = kubeflowv1.GroupVersion.Group - ControllerName = "training-operator" -) - -func GenOwnerReference(job metav1.Object, apiVersion string, kind string) *metav1.OwnerReference { - boolPtr := func(b bool) *bool { return &b } - controllerRef := &metav1.OwnerReference{ - APIVersion: apiVersion, - Kind: kind, - Name: job.GetName(), - UID: job.GetUID(), - BlockOwnerDeletion: boolPtr(true), - Controller: boolPtr(true), - } - - return controllerRef -} - -// ConvertTFJobToUnstructured uses function ToUnstructured to convert TFJob to Unstructured. -func ConvertTFJobToUnstructured(tfJob *kubeflowv1.TFJob) (*unstructured.Unstructured, error) { - object, err := runtime.DefaultUnstructuredConverter.ToUnstructured(tfJob) - if err != nil { - return nil, err - } - return &unstructured.Unstructured{ - Object: object, - }, nil -} - -func GetKey(tfJob *kubeflowv1.TFJob, t *testing.T) string { - key, err := KeyFunc(tfJob) - if err != nil { - t.Errorf("Unexpected error getting key for job %v: %v", tfJob.Name, err) - return "" - } - return key -} - -func CheckCondition(tfJob *kubeflowv1.TFJob, condition commonv1.JobConditionType, reason string) bool { - for _, v := range tfJob.Status.Conditions { - if v.Type == condition && v.Status == corev1.ConditionTrue && v.Reason == reason { - return true - } - } - return false -} diff --git a/pkg/controller.v1/tensorflow/job_test.go b/pkg/controller.v1/tensorflow/job_test.go index b48e0bc4c7..869b116fcd 100644 --- a/pkg/controller.v1/tensorflow/job_test.go +++ b/pkg/controller.v1/tensorflow/job_test.go @@ -36,7 +36,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - "github.com/kubeflow/training-operator/pkg/common/util/v1/testutil" + tftestutil "github.com/kubeflow/training-operator/pkg/controller.v1/tensorflow/testutil" ) var _ = Describe("TFJob controller", func() { @@ -57,11 +57,11 @@ var _ = Describe("TFJob controller", func() { ctx := context.Background() - tfJob := testutil.NewTFJob(1, 0) + tfJob := tftestutil.NewTFJob(1, 0) tfJob.SetName(testJobName) tfJob.SetNamespace(testNamespace) - decoyJob := testutil.NewTFJob(2, 3) + decoyJob := tftestutil.NewTFJob(2, 3) decoyJob.SetName(decoyJobName) decoyJob.SetNamespace(testNamespace) @@ -91,7 +91,7 @@ var _ = Describe("TFJob controller", func() { testLabelVal := "1" testJobName := "test-copy-labels-anno" - tfjob := testutil.NewTFJob(1, 0) + tfjob := tftestutil.NewTFJob(1, 0) tfjob.SetName(testJobName) annotations := map[string]string{ testAnnotationKey: testAnnotationVal, @@ -168,7 +168,7 @@ var _ = Describe("TFJob controller", func() { testCases := []testCase{ { description: "4 workers and 2 ps is running, policy is all", - tfJob: testutil.NewTFJobWithCleanPolicy(0, 4, 2, commonv1.CleanPodPolicyAll), + tfJob: tftestutil.NewTFJobWithCleanPolicy(0, 4, 2, commonv1.CleanPodPolicyAll), pendingWorkerPods: 0, activeWorkerPods: 4, @@ -187,7 +187,7 @@ var _ = Describe("TFJob controller", func() { }, { description: "4 workers and 2 ps is running, policy is running", - tfJob: testutil.NewTFJobWithCleanPolicy(0, 4, 2, commonv1.CleanPodPolicyRunning), + tfJob: tftestutil.NewTFJobWithCleanPolicy(0, 4, 2, commonv1.CleanPodPolicyRunning), pendingWorkerPods: 0, activeWorkerPods: 4, @@ -206,7 +206,7 @@ var _ = Describe("TFJob controller", func() { }, { description: "4 workers and 2 ps is succeeded, policy is running", - tfJob: testutil.NewTFJobWithCleanPolicy(0, 4, 2, commonv1.CleanPodPolicyRunning), + tfJob: tftestutil.NewTFJobWithCleanPolicy(0, 4, 2, commonv1.CleanPodPolicyRunning), pendingWorkerPods: 0, activeWorkerPods: 0, @@ -225,7 +225,7 @@ var _ = Describe("TFJob controller", func() { }, { description: "4 workers and 2 ps is succeeded, policy is None", - tfJob: testutil.NewTFJobWithCleanPolicy(0, 4, 2, commonv1.CleanPodPolicyNone), + tfJob: tftestutil.NewTFJobWithCleanPolicy(0, 4, 2, commonv1.CleanPodPolicyNone), pendingWorkerPods: 0, activeWorkerPods: 0, @@ -266,15 +266,15 @@ var _ = Describe("TFJob controller", func() { } By("creating Services and Pods with designed phases") - testutil.SetPodsStatuses(testK8sClient, tc.tfJob, testutil.LabelWorker, + tftestutil.SetPodsStatuses(testK8sClient, tc.tfJob, kubeflowv1.TFJobReplicaTypeWorker, tc.pendingWorkerPods, tc.activeWorkerPods, tc.succeededWorkerPods, tc.failedWorkerPods, nil, refs, basicLabels) - testutil.SetPodsStatuses(testK8sClient, tc.tfJob, testutil.LabelPS, + tftestutil.SetPodsStatuses(testK8sClient, tc.tfJob, kubeflowv1.TFJobReplicaTypePS, tc.pendingPSPods, tc.activePSPods, tc.succeededPSPods, tc.failedPSPods, nil, refs, basicLabels) - testutil.SetServices(testK8sClient, tc.tfJob, testutil.LabelWorker, tc.activeWorkerServices, refs, basicLabels) - testutil.SetServices(testK8sClient, tc.tfJob, testutil.LabelPS, tc.activePSServices, refs, basicLabels) + tftestutil.SetServices(testK8sClient, tc.tfJob, kubeflowv1.TFJobReplicaTypeWorker, tc.activeWorkerServices, refs, basicLabels) + tftestutil.SetServices(testK8sClient, tc.tfJob, kubeflowv1.TFJobReplicaTypePS, tc.activePSServices, refs, basicLabels) podList := &corev1.PodList{} Expect(testK8sClient.List(ctx, podList, listOpt)).Should(Succeed()) @@ -325,7 +325,7 @@ var _ = Describe("TFJob controller", func() { testCases := []testCase{ { description: "4 workers and 2 ps is running, ActiveDeadlineSeconds unset", - tfJob: testutil.NewTFJobWithActiveDeadlineSeconds(0, 4, 2, nil), + tfJob: tftestutil.NewTFJobWithActiveDeadlineSeconds(0, 4, 2, nil), pendingWorkerPods: 0, activeWorkerPods: 4, @@ -344,7 +344,7 @@ var _ = Describe("TFJob controller", func() { }, { description: "4 workers and 2 ps is running, ActiveDeadlineSeconds is 2", - tfJob: testutil.NewTFJobWithActiveDeadlineSeconds(0, 4, 2, adsTest2), + tfJob: tftestutil.NewTFJobWithActiveDeadlineSeconds(0, 4, 2, adsTest2), pendingWorkerPods: 0, activeWorkerPods: 4, @@ -383,15 +383,15 @@ var _ = Describe("TFJob controller", func() { } By("creating Services and Pods with designed phases") - testutil.SetPodsStatuses(testK8sClient, tc.tfJob, testutil.LabelWorker, + tftestutil.SetPodsStatuses(testK8sClient, tc.tfJob, kubeflowv1.TFJobReplicaTypeWorker, tc.pendingWorkerPods, tc.activeWorkerPods, tc.succeededWorkerPods, tc.failedWorkerPods, nil, refs, basicLabels) - testutil.SetPodsStatuses(testK8sClient, tc.tfJob, testutil.LabelPS, + tftestutil.SetPodsStatuses(testK8sClient, tc.tfJob, kubeflowv1.TFJobReplicaTypePS, tc.pendingPSPods, tc.activePSPods, tc.succeededPSPods, tc.failedPSPods, nil, refs, basicLabels) - testutil.SetServices(testK8sClient, tc.tfJob, testutil.LabelWorker, tc.activeWorkerServices, refs, basicLabels) - testutil.SetServices(testK8sClient, tc.tfJob, testutil.LabelPS, tc.activePSServices, refs, basicLabels) + tftestutil.SetServices(testK8sClient, tc.tfJob, kubeflowv1.TFJobReplicaTypeWorker, tc.activeWorkerServices, refs, basicLabels) + tftestutil.SetServices(testK8sClient, tc.tfJob, kubeflowv1.TFJobReplicaTypePS, tc.activePSServices, refs, basicLabels) podList := &corev1.PodList{} Expect(testK8sClient.List(ctx, podList, listOpt)).Should(Succeed()) @@ -453,7 +453,7 @@ var _ = Describe("TFJob controller", func() { testCases := []testCase{ { description: "4 workers each having 1 restartCount and 2 ps is running, backoffLimit 4 ", - tfJob: testutil.NewTFJobWithBackoffLimit(0, 4, 2, backoffLimitTest4), + tfJob: tftestutil.NewTFJobWithBackoffLimit(0, 4, 2, backoffLimitTest4), pendingWorkerPods: 0, activeWorkerPods: 4, @@ -495,15 +495,15 @@ var _ = Describe("TFJob controller", func() { } By("creating Services and Pods with designed phases") - testutil.SetPodsStatuses(testK8sClient, tc.tfJob, testutil.LabelWorker, + tftestutil.SetPodsStatuses(testK8sClient, tc.tfJob, kubeflowv1.TFJobReplicaTypeWorker, tc.pendingWorkerPods, tc.activeWorkerPods, tc.succeededWorkerPods, tc.failedWorkerPods, tc.restartCounts, refs, basicLabels) - testutil.SetPodsStatuses(testK8sClient, tc.tfJob, testutil.LabelPS, + tftestutil.SetPodsStatuses(testK8sClient, tc.tfJob, kubeflowv1.TFJobReplicaTypePS, tc.pendingPSPods, tc.activePSPods, tc.succeededPSPods, tc.failedPSPods, tc.restartCounts, refs, basicLabels) - testutil.SetServices(testK8sClient, tc.tfJob, testutil.LabelWorker, tc.activeWorkerServices, refs, basicLabels) - testutil.SetServices(testK8sClient, tc.tfJob, testutil.LabelPS, tc.activePSServices, refs, basicLabels) + tftestutil.SetServices(testK8sClient, tc.tfJob, kubeflowv1.TFJobReplicaTypeWorker, tc.activeWorkerServices, refs, basicLabels) + tftestutil.SetServices(testK8sClient, tc.tfJob, kubeflowv1.TFJobReplicaTypePS, tc.activePSServices, refs, basicLabels) podList := &corev1.PodList{} Expect(testK8sClient.List(ctx, podList, listOpt)).Should(Succeed()) @@ -537,12 +537,12 @@ var _ = Describe("TFJob controller", func() { testCases := []testCase{ { description: "succeeded job with TTL 3s", - tfJob: testutil.NewTFJobWithCleanupJobDelay(0, 1, 0, pointer.Int32(3)), + tfJob: tftestutil.NewTFJobWithCleanupJobDelay(0, 1, 0, pointer.Int32(3)), phase: corev1.PodSucceeded, }, { description: "failed job with TTL 3s", - tfJob: testutil.NewTFJobWithCleanupJobDelay(0, 1, 0, pointer.Int32(3)), + tfJob: tftestutil.NewTFJobWithCleanupJobDelay(0, 1, 0, pointer.Int32(3)), phase: corev1.PodFailed, }, } @@ -570,7 +570,7 @@ var _ = Describe("TFJob controller", func() { refs := []metav1.OwnerReference{ *reconciler.GenOwnerReference(tc.tfJob), } - pod := testutil.NewBasePod("pod", tc.tfJob, refs) + pod := tftestutil.NewBasePod("pod", tc.tfJob, refs) pod.Status.Phase = tc.phase By("update job replica statuses") @@ -661,7 +661,7 @@ var _ = Describe("Test for controller.v1/common", func() { } }, Entry("TFJob shouldn't be removed since TTL is nil", &cleanUpCases{ - tfJob: testutil.NewTFJobWithCleanupJobDelay(1, 2, 0, nil), + tfJob: tftestutil.NewTFJobWithCleanupJobDelay(1, 2, 0, nil), runPolicy: &kubeflowv1.RunPolicy{ TTLSecondsAfterFinished: nil, }, @@ -670,7 +670,7 @@ var _ = Describe("Test for controller.v1/common", func() { wantErr: false, }), Entry("Error is occurred since completionTime is nil", &cleanUpCases{ - tfJob: testutil.NewTFJobWithCleanupJobDelay(1, 2, 0, pointer.Int32(10)), + tfJob: tftestutil.NewTFJobWithCleanupJobDelay(1, 2, 0, pointer.Int32(10)), runPolicy: &kubeflowv1.RunPolicy{ TTLSecondsAfterFinished: pointer.Int32(10), }, @@ -681,7 +681,7 @@ var _ = Describe("Test for controller.v1/common", func() { wantErr: true, }), Entry("TFJob is removed since exceeded TTL (TTL is 180s)", &cleanUpCases{ - tfJob: testutil.NewTFJobWithCleanupJobDelay(1, 2, 0, pointer.Int32(180)), + tfJob: tftestutil.NewTFJobWithCleanupJobDelay(1, 2, 0, pointer.Int32(180)), runPolicy: &kubeflowv1.RunPolicy{ TTLSecondsAfterFinished: pointer.Int32(180), }, @@ -694,7 +694,7 @@ var _ = Describe("Test for controller.v1/common", func() { wantErr: false, }), Entry("TFJob is removed since (TTL is 0s)", &cleanUpCases{ - tfJob: testutil.NewTFJobWithCleanupJobDelay(1, 2, 0, pointer.Int32(0)), + tfJob: tftestutil.NewTFJobWithCleanupJobDelay(1, 2, 0, pointer.Int32(0)), runPolicy: &kubeflowv1.RunPolicy{ TTLSecondsAfterFinished: pointer.Int32(0), }, @@ -776,7 +776,7 @@ var _ = Describe("Test for controller.v1/common", func() { } }, Entry("Failed to create service since containerPort is missing", &createServiceCases{ - tfJob: testutil.NewTFJobV2(2, 0, 0, 1, 0), + tfJob: tftestutil.NewTFJobV2(2, 0, 0, 1, 0), spec: &kubeflowv1.ReplicaSpec{ Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ @@ -792,14 +792,14 @@ var _ = Describe("Test for controller.v1/common", func() { wantErr: true, }), Entry("Failed to create service since Job's ownerReference is invalid", &createServiceCases{ - tfJob: testutil.NewTFJobV2(2, 0, 0, 1, 0), - spec: &kubeflowv1.ReplicaSpec{Template: testutil.NewTFReplicaSpecTemplate()}, + tfJob: tftestutil.NewTFJobV2(2, 0, 0, 1, 0), + spec: &kubeflowv1.ReplicaSpec{Template: tftestutil.NewTFReplicaSpecTemplate()}, index: 1, wantErr: true, }), Entry("Succeeded to create service", &createServiceCases{ - tfJob: testutil.NewTFJobV2(2, 0, 0, 1, 0), - spec: &kubeflowv1.ReplicaSpec{Template: testutil.NewTFReplicaSpecTemplate()}, + tfJob: tftestutil.NewTFJobV2(2, 0, 0, 1, 0), + spec: &kubeflowv1.ReplicaSpec{Template: tftestutil.NewTFReplicaSpecTemplate()}, index: 0, wantErr: false, uid: uuid.NewUUID(), diff --git a/pkg/controller.v1/tensorflow/pod_test.go b/pkg/controller.v1/tensorflow/pod_test.go index 14328bae94..3c3a830217 100644 --- a/pkg/controller.v1/tensorflow/pod_test.go +++ b/pkg/controller.v1/tensorflow/pod_test.go @@ -32,7 +32,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - "github.com/kubeflow/training-operator/pkg/common/util/v1/testutil" + tftestutil "github.com/kubeflow/training-operator/pkg/controller.v1/tensorflow/testutil" ) var _ = Describe("TFJob controller", func() { @@ -52,52 +52,52 @@ var _ = Describe("TFJob controller", func() { } testCase := []tc{ { - tfJob: testutil.NewTFJobWithNamespace(1, 0, "ns0"), + tfJob: tftestutil.NewTFJobWithNamespace(1, 0, "ns0"), rt: "worker", index: "0", customClusterDomain: "", expectedClusterSpec: "", }, { - tfJob: testutil.NewTFJobWithNamespace(1, 0, "ns1"), + tfJob: tftestutil.NewTFJobWithNamespace(1, 0, "ns1"), rt: "worker", index: "0", customClusterDomain: "tf.training.com", expectedClusterSpec: "", }, { - tfJob: testutil.NewTFJobWithNamespace(1, 1, "ns2"), + tfJob: tftestutil.NewTFJobWithNamespace(1, 1, "ns2"), rt: "worker", index: "0", customClusterDomain: "tf.training.org", - expectedClusterSpec: `{"cluster":{"ps":["` + testutil.TestTFJobName + - `-ps-0.ns2.svc.tf.training.org:2222"],"worker":["` + testutil.TestTFJobName + + expectedClusterSpec: `{"cluster":{"ps":["` + tftestutil.TestTFJobName + + `-ps-0.ns2.svc.tf.training.org:2222"],"worker":["` + tftestutil.TestTFJobName + `-worker-0.ns2.svc.tf.training.org:2222"]},"task":{"type":"worker","index":0},"environment":"cloud"}`, }, { - tfJob: testutil.NewTFJobWithEvaluatorAndNamespace(1, 1, 1, "ns3"), + tfJob: tftestutil.NewTFJobWithEvaluatorAndNamespace(1, 1, 1, "ns3"), rt: "worker", index: "0", customClusterDomain: "tf.training.io", - expectedClusterSpec: `{"cluster":{"evaluator":["` + testutil.TestTFJobName + - `-evaluator-0.ns3.svc.tf.training.io:2222"],"ps":["` + testutil.TestTFJobName + - `-ps-0.ns3.svc.tf.training.io:2222"],"worker":["` + testutil.TestTFJobName + + expectedClusterSpec: `{"cluster":{"evaluator":["` + tftestutil.TestTFJobName + + `-evaluator-0.ns3.svc.tf.training.io:2222"],"ps":["` + tftestutil.TestTFJobName + + `-ps-0.ns3.svc.tf.training.io:2222"],"worker":["` + tftestutil.TestTFJobName + `-worker-0.ns3.svc.tf.training.io:2222"]},"task":{"type":"worker","index":0},"environment":"cloud"}`, }, { - tfJob: testutil.NewTFJobWithEvaluatorAndNamespace(1, 1, 1, "ns3"), + tfJob: tftestutil.NewTFJobWithEvaluatorAndNamespace(1, 1, 1, "ns3"), rt: "worker", index: "0", customClusterDomain: "", - expectedClusterSpec: `{"cluster":{"evaluator":["` + testutil.TestTFJobName + - `-evaluator-0.ns3.svc:2222"],"ps":["` + testutil.TestTFJobName + - `-ps-0.ns3.svc:2222"],"worker":["` + testutil.TestTFJobName + + expectedClusterSpec: `{"cluster":{"evaluator":["` + tftestutil.TestTFJobName + + `-evaluator-0.ns3.svc:2222"],"ps":["` + tftestutil.TestTFJobName + + `-ps-0.ns3.svc:2222"],"worker":["` + tftestutil.TestTFJobName + `-worker-0.ns3.svc:2222"]},"task":{"type":"worker","index":0},"environment":"cloud"}`, }, } for _, c := range testCase { - c.tfJob.SetName("test-tfjob") + c.tfJob.SetName(tftestutil.TestTFJobName) c.tfJob.SetUID(uuid.NewUUID()) _ = os.Setenv(EnvCustomClusterDomain, c.customClusterDomain) @@ -135,19 +135,19 @@ var _ = Describe("TFJob controller", func() { } testCase := []tc{ { - tfJob: testutil.NewTFJob(1, 0), + tfJob: tftestutil.NewTFJob(1, 0), expected: false, }, { - tfJob: testutil.NewTFJob(1, 1), + tfJob: tftestutil.NewTFJob(1, 1), expected: true, }, { - tfJob: testutil.NewTFJob(0, 1), + tfJob: tftestutil.NewTFJob(0, 1), expected: false, }, { - tfJob: testutil.NewTFJobWithChief(1, 0), + tfJob: tftestutil.NewTFJobWithChief(1, 0), expected: true, }, } @@ -166,7 +166,7 @@ var _ = Describe("TFJob controller", func() { } testCase := []tc{ func() tc { - tfJob := testutil.NewTFJob(1, 0) + tfJob := tftestutil.NewTFJob(1, 0) specRestartPolicy := commonv1.RestartPolicyExitCode tfJob.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypeWorker].RestartPolicy = specRestartPolicy return tc{ @@ -176,7 +176,7 @@ var _ = Describe("TFJob controller", func() { } }(), func() tc { - tfJob := testutil.NewTFJob(1, 0) + tfJob := tftestutil.NewTFJob(1, 0) specRestartPolicy := commonv1.RestartPolicyNever tfJob.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypeWorker].RestartPolicy = specRestartPolicy return tc{ @@ -186,7 +186,7 @@ var _ = Describe("TFJob controller", func() { } }(), func() tc { - tfJob := testutil.NewTFJob(1, 0) + tfJob := tftestutil.NewTFJob(1, 0) specRestartPolicy := commonv1.RestartPolicyAlways tfJob.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypeWorker].RestartPolicy = specRestartPolicy return tc{ @@ -196,7 +196,7 @@ var _ = Describe("TFJob controller", func() { } }(), func() tc { - tfJob := testutil.NewTFJob(1, 0) + tfJob := tftestutil.NewTFJob(1, 0) specRestartPolicy := commonv1.RestartPolicyOnFailure tfJob.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypeWorker].RestartPolicy = specRestartPolicy return tc{ @@ -220,7 +220,7 @@ var _ = Describe("TFJob controller", func() { By("Creating TFJob \"test-exit-code\" with 1 worker only") ctx := context.Background() - tfJob := testutil.NewTFJob(1, 0) + tfJob := tftestutil.NewTFJob(1, 0) tfJob.SetName("test-exit-code") tfJob.SetUID(uuid.NewUUID()) tfJob.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypeWorker].RestartPolicy = commonv1.RestartPolicyExitCode @@ -229,14 +229,14 @@ var _ = Describe("TFJob controller", func() { *reconciler.GenOwnerReference(tfJob), } By("creating worker Pod") - pod := testutil.NewPod(tfJob, testutil.LabelWorker, 0, refs) + pod := tftestutil.NewPod(tfJob, kubeflowv1.TFJobReplicaTypeWorker, 0, refs) basicLabels := reconciler.GenLabels(tfJob.GetName()) for k, v := range basicLabels { pod.Labels[k] = v } pod.Spec.Containers = append(pod.Spec.Containers, corev1.Container{ Name: kubeflowv1.TFJobDefaultContainerName, - Image: testutil.DummyContainerImage, + Image: tftestutil.DummyContainerImage, }) Expect(testK8sClient.Create(ctx, pod)).Should(Succeed()) @@ -284,7 +284,7 @@ var _ = Describe("TFJob controller", func() { It("should delete redundant Pods", func() { ctx := context.Background() - tfJob := testutil.NewTFJob(2, 0) + tfJob := tftestutil.NewTFJob(2, 0) //tfJob.SelfLink = "/api/v1/namespaces/default/tfjob/test-tfjob" tfJob.SetName("test-scale-down") tfJob.SetUID(uuid.NewUUID()) @@ -293,9 +293,9 @@ var _ = Describe("TFJob controller", func() { refs := []metav1.OwnerReference{*reconciler.GenOwnerReference(tfJob)} pods := []*corev1.Pod{ - testutil.NewPod(tfJob, testutil.LabelWorker, 0, refs), - testutil.NewPod(tfJob, testutil.LabelWorker, 1, refs), - testutil.NewPod(tfJob, testutil.LabelWorker, 2, refs), + tftestutil.NewPod(tfJob, kubeflowv1.TFJobReplicaTypeWorker, 0, refs), + tftestutil.NewPod(tfJob, kubeflowv1.TFJobReplicaTypeWorker, 1, refs), + tftestutil.NewPod(tfJob, kubeflowv1.TFJobReplicaTypeWorker, 2, refs), } for i := range pods { @@ -349,7 +349,7 @@ var _ = Describe("TFJob controller", func() { It("should create missing Pods", func() { ctx := context.Background() - tfJob := testutil.NewTFJob(3, 0) + tfJob := tftestutil.NewTFJob(3, 0) tfJob.SetName("test-scale-up") tfJob.SetUID(uuid.NewUUID()) tfJob.Spec.EnableDynamicWorker = true @@ -357,7 +357,7 @@ var _ = Describe("TFJob controller", func() { refs := []metav1.OwnerReference{*reconciler.GenOwnerReference(tfJob)} pods := []*corev1.Pod{ - testutil.NewPod(tfJob, testutil.LabelWorker, 0, refs), + tftestutil.NewPod(tfJob, kubeflowv1.TFJobReplicaTypeWorker, 0, refs), } for i := range pods { @@ -431,77 +431,77 @@ var _ = Describe("TFJob controller", func() { }{ { workers: [3]int32{0, 0, 1}, - tfJob: testutil.NewTFJobV2(1, 1, 0, 0, 0), + tfJob: tftestutil.NewTFJobV2(1, 1, 0, 0, 0), expected: false, expectedErr: false, replicas: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{ kubeflowv1.TFJobReplicaTypeWorker: { Replicas: newInt32(1), - Template: testutil.NewTFReplicaSpecTemplate(), + Template: tftestutil.NewTFReplicaSpecTemplate(), }, kubeflowv1.TFJobReplicaTypePS: { Replicas: newInt32(1), - Template: testutil.NewTFReplicaSpecTemplate(), + Template: tftestutil.NewTFReplicaSpecTemplate(), }, }, }, { workers: [3]int32{0, 1, 0}, - tfJob: testutil.NewTFJobV2(1, 0, 0, 0, 0), + tfJob: tftestutil.NewTFJobV2(1, 0, 0, 0, 0), expected: true, expectedErr: false, replicas: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{ kubeflowv1.TFJobReplicaTypeWorker: { Replicas: newInt32(1), - Template: testutil.NewTFReplicaSpecTemplate(), + Template: tftestutil.NewTFReplicaSpecTemplate(), }, }, }, { workers: [3]int32{0, 0, 0}, - tfJob: testutil.NewTFJobV2(0, 0, 1, 0, 0), + tfJob: tftestutil.NewTFJobV2(0, 0, 1, 0, 0), expected: true, expectedErr: false, replicas: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{ kubeflowv1.TFJobReplicaTypeMaster: { Replicas: newInt32(1), - Template: testutil.NewTFReplicaSpecTemplate(), + Template: tftestutil.NewTFReplicaSpecTemplate(), }, }, }, { workers: [3]int32{0, 0, 0}, - tfJob: testutil.NewTFJobV2(0, 0, 0, 1, 0), + tfJob: tftestutil.NewTFJobV2(0, 0, 0, 1, 0), expected: true, expectedErr: false, replicas: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{ kubeflowv1.TFJobReplicaTypeChief: { Replicas: newInt32(1), - Template: testutil.NewTFReplicaSpecTemplate(), + Template: tftestutil.NewTFReplicaSpecTemplate(), }, }, }, { workers: [3]int32{1, 1, 0}, - tfJob: testutil.NewTFJobV2(2, 0, 0, 0, 0), + tfJob: tftestutil.NewTFJobV2(2, 0, 0, 0, 0), expected: true, expectedErr: false, replicas: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{ kubeflowv1.TFJobReplicaTypeWorker: { Replicas: newInt32(2), - Template: testutil.NewTFReplicaSpecTemplate(), + Template: tftestutil.NewTFReplicaSpecTemplate(), }, }, }, { workers: [3]int32{1, 0, 1}, - tfJob: testutil.NewTFJobV2(2, 0, 0, 0, 0), + tfJob: tftestutil.NewTFJobV2(2, 0, 0, 0, 0), expected: false, expectedErr: false, replicas: map[commonv1.ReplicaType]*commonv1.ReplicaSpec{ kubeflowv1.TFJobReplicaTypeWorker: { Replicas: newInt32(2), - Template: testutil.NewTFReplicaSpecTemplate(), + Template: tftestutil.NewTFReplicaSpecTemplate(), }, }, }, diff --git a/pkg/controller.v1/tensorflow/status_test.go b/pkg/controller.v1/tensorflow/status_test.go index d073e6c195..92affc52ae 100644 --- a/pkg/controller.v1/tensorflow/status_test.go +++ b/pkg/controller.v1/tensorflow/status_test.go @@ -31,7 +31,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - "github.com/kubeflow/training-operator/pkg/common/util/v1/testutil" + tftestutil "github.com/kubeflow/training-operator/pkg/controller.v1/tensorflow/testutil" ) var _ = Describe("TFJob controller", func() { @@ -44,14 +44,14 @@ var _ = Describe("TFJob controller", func() { Context("Test Failed", func() { It("should update TFJob with failed status", func() { By("creating a TFJob with replicaStatues initialized") - tfJob := testutil.NewTFJob(3, 0) + tfJob := tftestutil.NewTFJob(3, 0) initializeReplicaStatuses(&tfJob.Status, kubeflowv1.TFJobReplicaTypeWorker) By("prepare pod") refs := []metav1.OwnerReference{ *reconciler.GenOwnerReference(tfJob), } - pod := testutil.NewBasePod("pod", tfJob, refs) + pod := tftestutil.NewBasePod("pod", tfJob, refs) pod.Status.Phase = v1.PodFailed By("update job replica statuses") @@ -99,7 +99,7 @@ var _ = Describe("TFJob controller", func() { testCases := []testCase{ { description: "Chief worker is succeeded", - tfJob: testutil.NewTFJobWithChief(1, 0), + tfJob: tftestutil.NewTFJobWithChief(1, 0), expectedFailedPS: 0, expectedSucceededPS: 0, expectedActivePS: 0, @@ -115,7 +115,7 @@ var _ = Describe("TFJob controller", func() { }, { description: "Chief worker is running", - tfJob: testutil.NewTFJobWithChief(1, 0), + tfJob: tftestutil.NewTFJobWithChief(1, 0), expectedFailedPS: 0, expectedSucceededPS: 0, expectedActivePS: 0, @@ -131,7 +131,7 @@ var _ = Describe("TFJob controller", func() { }, { description: "Chief worker is failed", - tfJob: testutil.NewTFJobWithChief(1, 0), + tfJob: tftestutil.NewTFJobWithChief(1, 0), expectedFailedPS: 0, expectedSucceededPS: 0, expectedActivePS: 0, @@ -147,7 +147,7 @@ var _ = Describe("TFJob controller", func() { }, { description: "(No chief worker) Worker is failed", - tfJob: testutil.NewTFJob(1, 0), + tfJob: tftestutil.NewTFJob(1, 0), expectedFailedPS: 0, expectedSucceededPS: 0, expectedActivePS: 0, @@ -163,7 +163,7 @@ var _ = Describe("TFJob controller", func() { }, { description: "(No chief worker) Worker is succeeded", - tfJob: testutil.NewTFJob(1, 0), + tfJob: tftestutil.NewTFJob(1, 0), expectedFailedPS: 0, expectedSucceededPS: 0, expectedActivePS: 0, @@ -179,7 +179,7 @@ var _ = Describe("TFJob controller", func() { }, { description: "(No chief worker) Worker is running", - tfJob: testutil.NewTFJob(1, 0), + tfJob: tftestutil.NewTFJob(1, 0), expectedFailedPS: 0, expectedSucceededPS: 0, expectedActivePS: 0, @@ -195,7 +195,7 @@ var _ = Describe("TFJob controller", func() { }, { description: "(No chief worker) 2 workers are succeeded, 2 workers are active", - tfJob: testutil.NewTFJob(4, 2), + tfJob: tftestutil.NewTFJob(4, 2), expectedFailedPS: 0, expectedSucceededPS: 0, expectedActivePS: 2, @@ -211,7 +211,7 @@ var _ = Describe("TFJob controller", func() { }, { description: "(No chief worker) 2 workers are running, 2 workers are failed", - tfJob: testutil.NewTFJob(4, 2), + tfJob: tftestutil.NewTFJob(4, 2), expectedFailedPS: 0, expectedSucceededPS: 0, expectedActivePS: 2, @@ -227,7 +227,7 @@ var _ = Describe("TFJob controller", func() { }, { description: "(No chief worker) 2 workers are succeeded, 2 workers are failed", - tfJob: testutil.NewTFJob(4, 2), + tfJob: tftestutil.NewTFJob(4, 2), expectedFailedPS: 0, expectedSucceededPS: 0, expectedActivePS: 2, @@ -243,7 +243,7 @@ var _ = Describe("TFJob controller", func() { }, { description: "(No chief worker) worker-0 are succeeded, 3 workers are active", - tfJob: testutil.NewTFJob(4, 2), + tfJob: tftestutil.NewTFJob(4, 2), expectedFailedPS: 0, expectedSucceededPS: 0, expectedActivePS: 2, @@ -259,7 +259,7 @@ var _ = Describe("TFJob controller", func() { }, { description: "(No chief worker, successPolicy: AllWorkers) worker-0 are succeeded, 3 workers are active", - tfJob: testutil.NewTFJobWithSuccessPolicy(4, 0, kubeflowv1.SuccessPolicyAllWorkers), + tfJob: tftestutil.NewTFJobWithSuccessPolicy(4, 0, kubeflowv1.SuccessPolicyAllWorkers), expectedFailedPS: 0, expectedSucceededPS: 0, expectedActivePS: 0, @@ -275,7 +275,7 @@ var _ = Describe("TFJob controller", func() { }, { description: "(No chief worker, successPolicy: AllWorkers) 4 workers are succeeded", - tfJob: testutil.NewTFJobWithSuccessPolicy(4, 0, kubeflowv1.SuccessPolicyAllWorkers), + tfJob: tftestutil.NewTFJobWithSuccessPolicy(4, 0, kubeflowv1.SuccessPolicyAllWorkers), expectedFailedPS: 0, expectedSucceededPS: 0, expectedActivePS: 0, @@ -291,7 +291,7 @@ var _ = Describe("TFJob controller", func() { }, { description: "(No chief worker, successPolicy: AllWorkers) worker-0 is succeeded, 2 workers are running, 1 worker is failed", - tfJob: testutil.NewTFJobWithSuccessPolicy(4, 0, kubeflowv1.SuccessPolicyAllWorkers), + tfJob: tftestutil.NewTFJobWithSuccessPolicy(4, 0, kubeflowv1.SuccessPolicyAllWorkers), expectedFailedPS: 0, expectedSucceededPS: 0, expectedActivePS: 0, @@ -307,7 +307,7 @@ var _ = Describe("TFJob controller", func() { }, { description: "Chief is running, workers are failed", - tfJob: testutil.NewTFJobWithChief(4, 2), + tfJob: tftestutil.NewTFJobWithChief(4, 2), expectedFailedPS: 0, expectedSucceededPS: 0, expectedActivePS: 2, @@ -323,7 +323,7 @@ var _ = Describe("TFJob controller", func() { }, { description: "Chief is running, workers are succeeded", - tfJob: testutil.NewTFJobWithChief(4, 2), + tfJob: tftestutil.NewTFJobWithChief(4, 2), expectedFailedPS: 0, expectedSucceededPS: 0, expectedActivePS: 2, @@ -339,7 +339,7 @@ var _ = Describe("TFJob controller", func() { }, { description: "Chief is running, a PS is failed", - tfJob: testutil.NewTFJobWithChief(4, 2), + tfJob: tftestutil.NewTFJobWithChief(4, 2), expectedFailedPS: 1, expectedSucceededPS: 0, expectedActivePS: 1, @@ -355,7 +355,7 @@ var _ = Describe("TFJob controller", func() { }, { description: "Chief is failed, workers are succeeded", - tfJob: testutil.NewTFJobWithChief(4, 2), + tfJob: tftestutil.NewTFJobWithChief(4, 2), expectedFailedPS: 0, expectedSucceededPS: 0, expectedActivePS: 2, @@ -371,7 +371,7 @@ var _ = Describe("TFJob controller", func() { }, { description: "Chief is succeeded, workers are failed", - tfJob: testutil.NewTFJobWithChief(4, 2), + tfJob: tftestutil.NewTFJobWithChief(4, 2), expectedFailedPS: 0, expectedSucceededPS: 0, expectedActivePS: 2, @@ -387,7 +387,7 @@ var _ = Describe("TFJob controller", func() { }, { description: "Chief is failed and restarting", - tfJob: testutil.NewTFJobWithChief(4, 2), + tfJob: tftestutil.NewTFJobWithChief(4, 2), expectedFailedPS: 0, expectedSucceededPS: 0, expectedActivePS: 2, @@ -477,18 +477,11 @@ func setStatusForTest(tfJob *kubeflowv1.TFJob, rtype commonv1.ReplicaType, faile ctx := context.Background() - var typ string - switch rtype { - case kubeflowv1.TFJobReplicaTypeWorker: - typ = testutil.LabelWorker - case kubeflowv1.TFJobReplicaTypePS: - typ = testutil.LabelPS - case kubeflowv1.TFJobReplicaTypeChief: - typ = testutil.LabelChief - default: - fmt.Println("wrong type") - } - Expect(typ).ShouldNot(Equal("")) + Expect(rtype).Should(BeElementOf([]kubeflowv1.ReplicaType{ + kubeflowv1.TFJobReplicaTypeWorker, + kubeflowv1.TFJobReplicaTypePS, + kubeflowv1.TFJobReplicaTypeChief, + })) refs := []metav1.OwnerReference{ *reconciler.GenOwnerReference(tfJob), @@ -497,7 +490,7 @@ func setStatusForTest(tfJob *kubeflowv1.TFJob, rtype commonv1.ReplicaType, faile var i int32 index := 0 for i = 0; i < succeeded; i++ { - pod := testutil.NewPod(tfJob, typ, index, refs) + pod := tftestutil.NewPod(tfJob, rtype, index, refs) for k, v := range basicLabels { pod.Labels[k] = v } @@ -534,7 +527,7 @@ func setStatusForTest(tfJob *kubeflowv1.TFJob, rtype commonv1.ReplicaType, faile } for i = 0; i < failed; i++ { - pod := testutil.NewPod(tfJob, typ, index, refs) + pod := tftestutil.NewPod(tfJob, rtype, index, refs) for k, v := range basicLabels { pod.Labels[k] = v } @@ -572,7 +565,7 @@ func setStatusForTest(tfJob *kubeflowv1.TFJob, rtype commonv1.ReplicaType, faile } for i = 0; i < active; i++ { - pod := testutil.NewPod(tfJob, typ, index, refs) + pod := tftestutil.NewPod(tfJob, rtype, index, refs) for k, v := range basicLabels { pod.Labels[k] = v } diff --git a/pkg/common/util/v1/testutil/pod.go b/pkg/controller.v1/tensorflow/testutil/pod.go similarity index 88% rename from pkg/common/util/v1/testutil/pod.go rename to pkg/controller.v1/tensorflow/testutil/pod.go index 10816c2d43..f172b0c44b 100644 --- a/pkg/common/util/v1/testutil/pod.go +++ b/pkg/controller.v1/tensorflow/testutil/pod.go @@ -17,6 +17,7 @@ package testutil import ( "context" "fmt" + "strings" "time" commonv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" @@ -52,15 +53,15 @@ func NewBasePod(name string, job metav1.Object, refs []metav1.OwnerReference) *c } } -func NewPod(job metav1.Object, typ string, index int, refs []metav1.OwnerReference) *corev1.Pod { - pod := NewBasePod(fmt.Sprintf("%s-%s-%d", job.GetName(), typ, index), job, refs) - pod.Labels[commonv1.ReplicaTypeLabel] = typ +func NewPod(job metav1.Object, typ commonv1.ReplicaType, index int, refs []metav1.OwnerReference) *corev1.Pod { + pod := NewBasePod(fmt.Sprintf("%s-%s-%d", job.GetName(), strings.ToLower(string(typ)), index), job, refs) + pod.Labels[commonv1.ReplicaTypeLabel] = strings.ToLower(string(typ)) pod.Labels[commonv1.ReplicaIndexLabel] = fmt.Sprintf("%d", index) return pod } // NewPodList create count pods with the given phase for the given tfJob -func NewPodList(count int32, status corev1.PodPhase, job metav1.Object, typ string, start int32, refs []metav1.OwnerReference) []*corev1.Pod { +func NewPodList(count int32, status corev1.PodPhase, job metav1.Object, typ commonv1.ReplicaType, start int32, refs []metav1.OwnerReference) []*corev1.Pod { pods := []*corev1.Pod{} for i := int32(0); i < count; i++ { newPod := NewPod(job, typ, int(start+i), refs) @@ -70,7 +71,7 @@ func NewPodList(count int32, status corev1.PodPhase, job metav1.Object, typ stri return pods } -func SetPodsStatuses(client client.Client, job metav1.Object, typ string, +func SetPodsStatuses(client client.Client, job metav1.Object, typ commonv1.ReplicaType, pendingPods, activePods, succeededPods, failedPods int32, restartCounts []int32, refs []metav1.OwnerReference, basicLabels map[string]string) { timeout := 10 * time.Second diff --git a/pkg/common/util/v1/testutil/service.go b/pkg/controller.v1/tensorflow/testutil/service.go similarity index 79% rename from pkg/common/util/v1/testutil/service.go rename to pkg/controller.v1/tensorflow/testutil/service.go index caaea42a6d..3bb0e5320b 100644 --- a/pkg/common/util/v1/testutil/service.go +++ b/pkg/controller.v1/tensorflow/testutil/service.go @@ -17,6 +17,7 @@ package testutil import ( "context" "fmt" + "strings" commonv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" . "github.com/onsi/gomega" @@ -50,15 +51,15 @@ func NewBaseService(name string, job metav1.Object, refs []metav1.OwnerReference } } -func NewService(job metav1.Object, typ string, index int, refs []metav1.OwnerReference) *corev1.Service { - svc := NewBaseService(fmt.Sprintf("%s-%s-%d", job.GetName(), typ, index), job, refs) - svc.Labels[commonv1.ReplicaTypeLabel] = typ +func NewService(job metav1.Object, typ commonv1.ReplicaType, index int, refs []metav1.OwnerReference) *corev1.Service { + svc := NewBaseService(fmt.Sprintf("%s-%s-%d", job.GetName(), strings.ToLower(string(typ)), index), job, refs) + svc.Labels[commonv1.ReplicaTypeLabel] = strings.ToLower(string(typ)) svc.Labels[commonv1.ReplicaIndexLabel] = fmt.Sprintf("%d", index) return svc } // NewServiceList creates count pods with the given phase for the given tfJob -func NewServiceList(count int32, job metav1.Object, typ string, refs []metav1.OwnerReference) []*corev1.Service { +func NewServiceList(count int32, job metav1.Object, typ commonv1.ReplicaType, refs []metav1.OwnerReference) []*corev1.Service { services := []*corev1.Service{} for i := int32(0); i < count; i++ { newService := NewService(job, typ, int(i), refs) @@ -67,7 +68,7 @@ func NewServiceList(count int32, job metav1.Object, typ string, refs []metav1.Ow return services } -func SetServices(client client.Client, job metav1.Object, typ string, activeWorkerServices int32, +func SetServices(client client.Client, job metav1.Object, typ commonv1.ReplicaType, activeWorkerServices int32, refs []metav1.OwnerReference, basicLabels map[string]string) { ctx := context.Background() for _, svc := range NewServiceList(activeWorkerServices, job, typ, refs) { diff --git a/pkg/common/util/v1/testutil/tfjob.go b/pkg/controller.v1/tensorflow/testutil/tfjob.go similarity index 94% rename from pkg/common/util/v1/testutil/tfjob.go rename to pkg/controller.v1/tensorflow/testutil/tfjob.go index 76660b1db6..407717b9a6 100644 --- a/pkg/common/util/v1/testutil/tfjob.go +++ b/pkg/controller.v1/tensorflow/testutil/tfjob.go @@ -15,8 +15,6 @@ package testutil import ( - "time" - commonv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -24,6 +22,8 @@ import ( kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" ) +const TestTFJobName = "test-tfjob" + func NewTFJobWithCleanPolicy(chief, worker, ps int, policy commonv1.CleanPodPolicy) *kubeflowv1.TFJob { if chief == 1 { tfJob := NewTFJobWithChief(worker, ps) @@ -113,7 +113,7 @@ func NewTFJobWithSuccessPolicy(worker, ps int, successPolicy kubeflowv1.SuccessP func NewTFJob(worker, ps int) *kubeflowv1.TFJob { tfJob := &kubeflowv1.TFJob{ TypeMeta: metav1.TypeMeta{ - Kind: TFJobKind, + Kind: kubeflowv1.TFJobKind, }, ObjectMeta: metav1.ObjectMeta{ Name: TestTFJobName, @@ -148,7 +148,7 @@ func NewTFJob(worker, ps int) *kubeflowv1.TFJob { func NewTFJobV2(worker, ps, master, chief, evaluator int) *kubeflowv1.TFJob { tfJob := &kubeflowv1.TFJob{ TypeMeta: metav1.TypeMeta{ - Kind: TFJobKind, + Kind: kubeflowv1.TFJobKind, }, ObjectMeta: metav1.ObjectMeta{ Name: TestTFJobName, @@ -227,7 +227,7 @@ func NewTFReplicaSpecTemplate() v1.PodTemplateSpec { Containers: []v1.Container{ v1.Container{ Name: kubeflowv1.TFJobDefaultContainerName, - Image: TestImageName, + Image: "test-image-for-kubeflow-training-operator:latest", Args: []string{"Fake", "Fake"}, Ports: []v1.ContainerPort{ v1.ContainerPort{ @@ -241,7 +241,11 @@ func NewTFReplicaSpecTemplate() v1.PodTemplateSpec { } } -func SetTFJobCompletionTime(tfJob *kubeflowv1.TFJob) { - now := metav1.Time{Time: time.Now()} - tfJob.Status.CompletionTime = &now +func CheckCondition(tfJob *kubeflowv1.TFJob, condition commonv1.JobConditionType, reason string) bool { + for _, v := range tfJob.Status.Conditions { + if v.Type == condition && v.Status == v1.ConditionTrue && v.Reason == reason { + return true + } + } + return false } diff --git a/pkg/controller.v1/tensorflow/tfjob_controller_test.go b/pkg/controller.v1/tensorflow/tfjob_controller_test.go index f1ab3d0c12..3b98b19e1d 100644 --- a/pkg/controller.v1/tensorflow/tfjob_controller_test.go +++ b/pkg/controller.v1/tensorflow/tfjob_controller_test.go @@ -17,6 +17,7 @@ package tensorflow import ( "context" "fmt" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" @@ -26,7 +27,7 @@ import ( commonv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - "github.com/kubeflow/training-operator/pkg/common/util/v1/testutil" + tftestutil "github.com/kubeflow/training-operator/pkg/controller.v1/tensorflow/testutil" ) var _ = Describe("TFJob controller", func() { @@ -176,18 +177,18 @@ var _ = Describe("TFJob controller", func() { jobName := fmt.Sprintf(jobNameTemplate, caseIdx) caseIdx++ - tfJob := testutil.NewTFJob(tc.worker, tc.ps) + tfJob := tftestutil.NewTFJob(tc.worker, tc.ps) tfJob.SetName(jobName) tfJob.SetUID(uuid.NewUUID()) refs := []metav1.OwnerReference{*reconciler.GenOwnerReference(tfJob)} basicLabels := reconciler.GenLabels(tfJob.GetName()) - testutil.SetPodsStatuses(testK8sClient, tfJob, testutil.LabelWorker, tc.pendingWorkerPods, tc.activeWorkerPods, tc.succeededWorkerPods, tc.failedWorkerPods, nil, refs, basicLabels) - testutil.SetPodsStatuses(testK8sClient, tfJob, testutil.LabelPS, tc.pendingPSPods, tc.activePSPods, tc.succeededPSPods, tc.failedPSPods, nil, refs, basicLabels) + tftestutil.SetPodsStatuses(testK8sClient, tfJob, kubeflowv1.TFJobReplicaTypeWorker, tc.pendingWorkerPods, tc.activeWorkerPods, tc.succeededWorkerPods, tc.failedWorkerPods, nil, refs, basicLabels) + tftestutil.SetPodsStatuses(testK8sClient, tfJob, kubeflowv1.TFJobReplicaTypePS, tc.pendingPSPods, tc.activePSPods, tc.succeededPSPods, tc.failedPSPods, nil, refs, basicLabels) - testutil.SetServices(testK8sClient, tfJob, testutil.LabelWorker, tc.activeWorkerServices, refs, basicLabels) - testutil.SetServices(testK8sClient, tfJob, testutil.LabelPS, tc.activePSServices, refs, basicLabels) + tftestutil.SetServices(testK8sClient, tfJob, kubeflowv1.TFJobReplicaTypeWorker, tc.activeWorkerServices, refs, basicLabels) + tftestutil.SetServices(testK8sClient, tfJob, kubeflowv1.TFJobReplicaTypePS, tc.activePSServices, refs, basicLabels) totalPodNumber := int(tc.pendingWorkerPods + tc.activeWorkerPods + tc.succeededWorkerPods + tc.failedWorkerPods + tc.pendingPSPods + tc.activePSPods + tc.succeededPSPods + tc.failedPSPods) totalServiceNumber := int(tc.activeWorkerServices + tc.activePSServices) @@ -313,7 +314,7 @@ var _ = Describe("TFJob controller", func() { // Validate Conditions if tc.expectedCondition != nil { - Expect(testutil.CheckCondition(tfJob, *tc.expectedCondition, tc.expectedConditionReason)).Should(BeTrue()) + Expect(tftestutil.CheckCondition(tfJob, *tc.expectedCondition, tc.expectedConditionReason)).Should(BeTrue()) } } }) diff --git a/pkg/controller.v1/tensorflow/util.go b/pkg/controller.v1/tensorflow/util.go index bcef658609..ba87bc33c9 100644 --- a/pkg/controller.v1/tensorflow/util.go +++ b/pkg/controller.v1/tensorflow/util.go @@ -15,9 +15,9 @@ package tensorflow import ( - commonv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" corev1 "k8s.io/api/core/v1" + commonv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" ) diff --git a/pkg/controller.v1/tensorflow/util_test.go b/pkg/controller.v1/tensorflow/util_test.go index 85596f1e67..dd2c8362d3 100644 --- a/pkg/controller.v1/tensorflow/util_test.go +++ b/pkg/controller.v1/tensorflow/util_test.go @@ -22,14 +22,14 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + tftestutil "github.com/kubeflow/training-operator/pkg/controller.v1/tensorflow/testutil" ) func TestGenOwnerReference(t *testing.T) { - testName := "test-tfjob" testUID := uuid.NewUUID() tfJob := &kubeflowv1.TFJob{ ObjectMeta: metav1.ObjectMeta{ - Name: testName, + Name: tftestutil.TestTFJobName, UID: testUID, }, } @@ -38,8 +38,8 @@ func TestGenOwnerReference(t *testing.T) { if ref.UID != testUID { t.Errorf("Expected UID %s, got %s", testUID, ref.UID) } - if ref.Name != testName { - t.Errorf("Expected Name %s, got %s", testName, ref.Name) + if ref.Name != tftestutil.TestTFJobName { + t.Errorf("Expected Name %s, got %s", tftestutil.TestTFJobName, ref.Name) } if ref.APIVersion != kubeflowv1.SchemeGroupVersion.String() { t.Errorf("Expected APIVersion %s, got %s", kubeflowv1.SchemeGroupVersion.String(), ref.APIVersion)