diff --git a/pkg/controller.v1/common/job_test.go b/pkg/controller.v1/common/job_test.go index 75d1d5c09a..b431553ef8 100644 --- a/pkg/controller.v1/common/job_test.go +++ b/pkg/controller.v1/common/job_test.go @@ -17,224 +17,180 @@ limitations under the License. package common import ( - "strconv" + "context" "testing" "time" + "github.com/google/go-cmp/cmp" apiv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "github.com/kubeflow/training-operator/pkg/controller.v1/control" testjobv1 "github.com/kubeflow/training-operator/test_job/apis/test_job/v1" - testjob "github.com/kubeflow/training-operator/test_job/controller.v1/test_job" - "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/record" ) func TestDeletePodsAndServices(T *testing.T) { - type testCase struct { - cleanPodPolicy apiv1.CleanPodPolicy - deleteRunningPodAndService bool - deleteSucceededPodAndService bool - } - - var testcase = []testCase{ - { - cleanPodPolicy: apiv1.CleanPodPolicyRunning, - deleteRunningPodAndService: true, - deleteSucceededPodAndService: false, + pods := []runtime.Object{ + newPod("runningPod", corev1.PodRunning), + newPod("succeededPod", corev1.PodSucceeded), + } + services := []runtime.Object{ + newService("runningPod"), + newService("succeededPod"), + } + + cases := map[string]struct { + cleanPodPolicy apiv1.CleanPodPolicy + wantPods *corev1.PodList + wantService *corev1.ServiceList + }{ + "CleanPodPolicy is Running": { + cleanPodPolicy: apiv1.CleanPodPolicyRunning, + wantPods: &corev1.PodList{ + Items: []corev1.Pod{ + *pods[1].(*corev1.Pod), + }, + }, + wantService: &corev1.ServiceList{ + Items: []corev1.Service{ + *services[1].(*corev1.Service), + }, + }, }, - { - cleanPodPolicy: apiv1.CleanPodPolicyAll, - deleteRunningPodAndService: true, - deleteSucceededPodAndService: true, + "CleanPodPolicy is All": { + cleanPodPolicy: apiv1.CleanPodPolicyAll, + wantPods: &corev1.PodList{}, + wantService: &corev1.ServiceList{}, }, - { - cleanPodPolicy: apiv1.CleanPodPolicyNone, - deleteRunningPodAndService: false, - deleteSucceededPodAndService: false, + "CleanPodPolicy is None": { + cleanPodPolicy: apiv1.CleanPodPolicyNone, + wantPods: &corev1.PodList{ + Items: []corev1.Pod{ + *pods[0].(*corev1.Pod), + *pods[1].(*corev1.Pod), + }, + }, + wantService: &corev1.ServiceList{ + Items: []corev1.Service{ + *services[0].(*corev1.Service), + *services[1].(*corev1.Service), + }, + }, }, } - - for _, tc := range testcase { - runningPod := newPod("runningPod", corev1.PodRunning) - succeededPod := newPod("succeededPod", corev1.PodSucceeded) - allPods := []*corev1.Pod{runningPod, succeededPod} - runningPodService := newService("runningPod") - succeededPodService := newService("succeededPod") - allServices := []*corev1.Service{runningPodService, succeededPodService} - - testJobController := testjob.TestJobController{ - Pods: allPods, - Services: allServices, - } - - fakePodControl := &control.FakePodControl{} - fakeServiceControl := &control.FakeServiceControl{} - - mainJobController := JobController{ - Controller: &testJobController, - PodControl: fakePodControl, - ServiceControl: fakeServiceControl, - } - runPolicy := apiv1.RunPolicy{ - CleanPodPolicy: &tc.cleanPodPolicy, - } - - job := &testjobv1.TestJob{} - err := mainJobController.DeletePodsAndServices(&runPolicy, job, allPods) - - if assert.NoError(T, err) { - if tc.deleteRunningPodAndService { - // should delete the running pod and its service - assert.Contains(T, fakePodControl.DeletePodName, runningPod.Name) - assert.Contains(T, fakeServiceControl.DeleteServiceName, runningPodService.Name) - } else { - // should NOT delete the running pod and its service - assert.NotContains(T, fakePodControl.DeletePodName, runningPod.Name) - assert.NotContains(T, fakeServiceControl.DeleteServiceName, runningPodService.Name) + for name, tc := range cases { + T.Run(name, func(t *testing.T) { + fakeClient := fake.NewSimpleClientset(append(pods, services...)...) + jobController := JobController{ + PodControl: control.RealPodControl{KubeClient: fakeClient, Recorder: &record.FakeRecorder{}}, + ServiceControl: control.RealServiceControl{KubeClient: fakeClient, Recorder: &record.FakeRecorder{}}, } - if tc.deleteSucceededPodAndService { - // should delete the SUCCEEDED pod and its service - assert.Contains(T, fakePodControl.DeletePodName, succeededPod.Name) - assert.Contains(T, fakeServiceControl.DeleteServiceName, succeededPodService.Name) - } else { - // should NOT delete the SUCCEEDED pod and its service - assert.NotContains(T, fakePodControl.DeletePodName, succeededPod.Name) - assert.NotContains(T, fakeServiceControl.DeleteServiceName, succeededPodService.Name) + var inPods []*corev1.Pod + for i := range pods { + inPods = append(inPods, pods[i].(*corev1.Pod)) + } + if err := jobController.DeletePodsAndServices(&apiv1.RunPolicy{ + CleanPodPolicy: &tc.cleanPodPolicy, + }, &testjobv1.TestJob{}, inPods); err != nil { + T.Errorf("Failed to delete pods and services: %v", err) + } + gotPods, err := fakeClient.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{}) + if err != nil { + t.Errorf("Failed to list pods: %v", err) + } + if diff := cmp.Diff(tc.wantPods, gotPods); len(diff) != 0 { + t.Errorf("Unexpected pods after running DeletePodsAndServices (-want,+got):%s\n", diff) } - } + gotServices, err := fakeClient.CoreV1().Services("").List(context.Background(), metav1.ListOptions{}) + if err != nil { + t.Errorf("Failed to list services: %v", err) + } + if diff := cmp.Diff(tc.wantService, gotServices); len(diff) != 0 { + t.Errorf("Unexpected services after running DeletePodsAndServices (-want,+got):%s\n", diff) + } + }) } } func TestPastBackoffLimit(T *testing.T) { - type testCase struct { - backOffLimit int32 - shouldPassBackoffLimit bool - } - - var testcase = []testCase{ - { - backOffLimit: int32(0), - shouldPassBackoffLimit: false, - }, - } - - for _, tc := range testcase { - runningPod := newPod("runningPod", corev1.PodRunning) - succeededPod := newPod("succeededPod", corev1.PodSucceeded) - allPods := []*corev1.Pod{runningPod, succeededPod} - - testJobController := testjob.TestJobController{ - Pods: allPods, - } - - mainJobController := JobController{ - Controller: &testJobController, - } - runPolicy := apiv1.RunPolicy{ - BackoffLimit: &tc.backOffLimit, - } - - result, err := mainJobController.PastBackoffLimit("fake-job", &runPolicy, nil, allPods) - - if assert.NoError(T, err) { - assert.Equal(T, result, tc.shouldPassBackoffLimit) - } - } -} - -func TestPastActiveDeadline(T *testing.T) { - type testCase struct { - activeDeadlineSeconds int64 - shouldPassActiveDeadline bool - } - - var testcase = []testCase{ - { - activeDeadlineSeconds: int64(0), - shouldPassActiveDeadline: true, + backoffLimitExceededPod := newPod("runningPodWithBackoff", corev1.PodRunning) + backoffLimitExceededPod.Status.ContainerStatuses = []corev1.ContainerStatus{ + {RestartCount: 3}, + } + allPods := []*corev1.Pod{ + newPod("runningPod", corev1.PodRunning), + newPod("succeededPod", corev1.PodSucceeded), + backoffLimitExceededPod, + } + cases := map[string]struct { + pods []*corev1.Pod + backOffLimit int32 + wantPastBackOffLimit bool + }{ + "backOffLimit is 0": { + pods: allPods[:2], + backOffLimit: 0, + wantPastBackOffLimit: false, }, - { - activeDeadlineSeconds: int64(2), - shouldPassActiveDeadline: false, + "backOffLimit is 3": { + pods: allPods, + backOffLimit: 3, + wantPastBackOffLimit: true, }, } - - for _, tc := range testcase { - - testJobController := testjob.TestJobController{} - - mainJobController := JobController{ - Controller: &testJobController, - } - runPolicy := apiv1.RunPolicy{ - ActiveDeadlineSeconds: &tc.activeDeadlineSeconds, - } - jobStatus := apiv1.JobStatus{ - StartTime: &metav1.Time{ - Time: time.Now(), - }, - } - - result := mainJobController.PastActiveDeadline(&runPolicy, jobStatus) - assert.Equal( - T, result, tc.shouldPassActiveDeadline, - "Result is not expected for activeDeadlineSeconds == "+strconv.FormatInt(tc.activeDeadlineSeconds, 10)) + for name, tc := range cases { + T.Run(name, func(t *testing.T) { + jobController := JobController{} + runPolicy := &apiv1.RunPolicy{ + BackoffLimit: &tc.backOffLimit, + } + replica := map[apiv1.ReplicaType]*apiv1.ReplicaSpec{ + "test": {RestartPolicy: apiv1.RestartPolicyOnFailure}, + } + got, err := jobController.PastBackoffLimit("test-job", runPolicy, replica, tc.pods) + if err != nil { + t.Errorf("Failaed to do PastBackoffLimit: %v", err) + } + if tc.wantPastBackOffLimit != got { + t.Errorf("Unexpected pastBackoffLimit: \nwant: %v\ngot: %v\n", tc.wantPastBackOffLimit, got) + } + }) } } -func TestCleanupJobIfTTL(T *testing.T) { - ttl := int32(0) - runPolicy := apiv1.RunPolicy{ - TTLSecondsAfterFinished: &ttl, - } - oneDayAgo := time.Now() - // one day ago - _ = oneDayAgo.AddDate(0, 0, -1) - jobStatus := apiv1.JobStatus{ - CompletionTime: &metav1.Time{ - Time: oneDayAgo, +func TestPastActiveDeadline(T *testing.T) { + cases := map[string]struct { + activeDeadlineSeconds int64 + wantPastActiveDeadlineSeconds bool + }{ + "activeDeadlineSeconds is 0": { + activeDeadlineSeconds: 0, + wantPastActiveDeadlineSeconds: true, }, - } - - testJobController := &testjob.TestJobController{ - Job: &testjobv1.TestJob{}, - } - mainJobController := JobController{ - Controller: testJobController, - } - - var job interface{} - err := mainJobController.CleanupJob(&runPolicy, jobStatus, job) - if assert.NoError(T, err) { - // job field is zeroed - assert.Empty(T, testJobController.Job) - } -} - -func TestCleanupJob(T *testing.T) { - ttl := int32(0) - runPolicy := apiv1.RunPolicy{ - TTLSecondsAfterFinished: &ttl, - } - jobStatus := apiv1.JobStatus{ - CompletionTime: &metav1.Time{ - Time: time.Now(), + "activeDeadlineSeconds is 2": { + activeDeadlineSeconds: 2, + wantPastActiveDeadlineSeconds: false, }, } - - testJobController := &testjob.TestJobController{ - Job: &testjobv1.TestJob{}, - } - mainJobController := JobController{ - Controller: testJobController, - } - - var job interface{} - err := mainJobController.CleanupJob(&runPolicy, jobStatus, job) - if assert.NoError(T, err) { - assert.Empty(T, testJobController.Job) + for name, tc := range cases { + T.Run(name, func(t *testing.T) { + jobController := JobController{} + runPolicy := &apiv1.RunPolicy{ + ActiveDeadlineSeconds: &tc.activeDeadlineSeconds, + } + jobStatus := apiv1.JobStatus{ + StartTime: &metav1.Time{ + Time: time.Now(), + }, + } + if got := jobController.PastActiveDeadline(runPolicy, jobStatus); tc.wantPastActiveDeadlineSeconds != got { + t.Errorf("Unexpected PastActiveDeadline: \nwant: %v\ngot: %v\n", tc.wantPastActiveDeadlineSeconds, got) + } + }) } } @@ -242,6 +198,9 @@ func newPod(name string, phase corev1.PodPhase) *corev1.Pod { pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, + Labels: map[string]string{ + apiv1.ReplicaTypeLabel: "test", + }, }, Status: corev1.PodStatus{ Phase: phase, diff --git a/pkg/controller.v1/common/service.go b/pkg/controller.v1/common/service.go index 2a6ede6db9..7a7d371546 100644 --- a/pkg/controller.v1/common/service.go +++ b/pkg/controller.v1/common/service.go @@ -206,7 +206,7 @@ func (jc *JobController) GetPortsFromJob(spec *apiv1.ReplicaSpec) (map[string]in return core.GetPortsFromJob(spec, jc.Controller.GetDefaultContainerName()) } -// createNewService creates a new service for the given index and type. +// CreateNewService creates a new service for the given index and type. func (jc *JobController) CreateNewService(job metav1.Object, rtype apiv1.ReplicaType, spec *apiv1.ReplicaSpec, index string) error { jobKey, err := KeyFunc(job) diff --git a/pkg/controller.v1/common/service_test.go b/pkg/controller.v1/common/service_test.go index 7d462dbb10..cb35122715 100644 --- a/pkg/controller.v1/common/service_test.go +++ b/pkg/controller.v1/common/service_test.go @@ -3,25 +3,13 @@ package common import ( "testing" - "github.com/kubeflow/training-operator/pkg/common" "github.com/kubeflow/training-operator/pkg/core" apiv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - "github.com/kubeflow/training-operator/pkg/controller.v1/control" - "github.com/kubeflow/training-operator/pkg/controller.v1/expectation" - testjobv1 "github.com/kubeflow/training-operator/test_job/apis/test_job/v1" - testjob "github.com/kubeflow/training-operator/test_job/controller.v1/test_job" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - kubeclientset "k8s.io/client-go/kubernetes" - corelisters "k8s.io/client-go/listers/core/v1" - schedulinglisters "k8s.io/client-go/listers/scheduling/v1" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/workqueue" - volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned" ) func TestCalculateServiceSliceSize(t *testing.T) { @@ -82,133 +70,6 @@ func TestCalculateServiceSliceSize(t *testing.T) { } } -func TestJobController_CreateNewService(t *testing.T) { - type fields struct { - Controller common.ControllerInterface - Config JobControllerConfiguration - PodControl control.PodControlInterface - ServiceControl control.ServiceControlInterface - KubeClientSet kubeclientset.Interface - VolcanoClientSet volcanoclient.Interface - PodLister corelisters.PodLister - ServiceLister corelisters.ServiceLister - PriorityClassLister schedulinglisters.PriorityClassLister - PodInformerSynced cache.InformerSynced - ServiceInformerSynced cache.InformerSynced - PriorityClassInformerSynced cache.InformerSynced - Expectations expectation.ControllerExpectationsInterface - WorkQueue workqueue.RateLimitingInterface - Recorder record.EventRecorder - } - type args struct { - job metav1.Object - rtype apiv1.ReplicaType - spec *apiv1.ReplicaSpec - index string - } - - var replicas int32 = 2 - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - {name: "test0", - fields: fields{ - Controller: &testjob.TestJobController{}, - Expectations: expectation.NewControllerExpectations(), - ServiceControl: &control.FakeServiceControl{}, - }, - args: args{ - job: &testjobv1.TestJob{}, - rtype: "Worker", - spec: &apiv1.ReplicaSpec{ - Replicas: &replicas, - Template: v1.PodTemplateSpec{ - Spec: v1.PodSpec{ - Containers: []v1.Container{ - v1.Container{ - Name: "default-container", - Ports: []v1.ContainerPort{ - v1.ContainerPort{ - Name: "test", - ContainerPort: 8080, - }, - v1.ContainerPort{ - Name: "default-port-name", - ContainerPort: 2222, - }, - }, - }, - }, - }, - }, - }, - index: "0", - }, - wantErr: false, - }, - {name: "test1", - fields: fields{ - Controller: &testjob.TestJobController{}, - Expectations: expectation.NewControllerExpectations(), - ServiceControl: &control.FakeServiceControl{}, - }, - args: args{ - job: &testjobv1.TestJob{}, - rtype: "Master", - spec: &apiv1.ReplicaSpec{ - Replicas: &replicas, - Template: v1.PodTemplateSpec{ - Spec: v1.PodSpec{ - Containers: []v1.Container{ - v1.Container{ - Name: "fake-container", - Ports: []v1.ContainerPort{ - v1.ContainerPort{ - Name: "test", - ContainerPort: 8080, - }, - v1.ContainerPort{ - Name: "default-port-name", - ContainerPort: 2222, - }, - }, - }, - }, - }, - }, - }, - index: "0", - }, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - jc := &JobController{ - Controller: tt.fields.Controller, - Config: tt.fields.Config, - PodControl: tt.fields.PodControl, - ServiceControl: tt.fields.ServiceControl, - KubeClientSet: tt.fields.KubeClientSet, - PodLister: tt.fields.PodLister, - ServiceLister: tt.fields.ServiceLister, - PriorityClassLister: tt.fields.PriorityClassLister, - PodInformerSynced: tt.fields.PodInformerSynced, - ServiceInformerSynced: tt.fields.ServiceInformerSynced, - PriorityClassInformerSynced: tt.fields.PriorityClassInformerSynced, - Expectations: tt.fields.Expectations, - WorkQueue: tt.fields.WorkQueue, - Recorder: tt.fields.Recorder, - } - if err := jc.CreateNewService(tt.args.job, tt.args.rtype, tt.args.spec, tt.args.index); (err != nil) != tt.wantErr { - t.Errorf("JobController.CreateNewService() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} func TestFilterServicesForReplicaType(t *testing.T) { services := []*v1.Service{ { diff --git a/pkg/controller.v1/tensorflow/job_test.go b/pkg/controller.v1/tensorflow/job_test.go index 5a75a66bb2..b48e0bc4c7 100644 --- a/pkg/controller.v1/tensorflow/job_test.go +++ b/pkg/controller.v1/tensorflow/job_test.go @@ -17,8 +17,10 @@ package tensorflow import ( "context" "fmt" + "strconv" "time" + "github.com/google/go-cmp/cmp/cmpopts" commonv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "github.com/kubeflow/training-operator/pkg/controller.v1/common" commonutil "github.com/kubeflow/training-operator/pkg/util" @@ -28,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" @@ -608,5 +611,198 @@ var _ = Describe("TFJob controller", func() { } }) }) +}) + +var _ = Describe("Test for controller.v1/common", func() { + var ( + ctx = context.Background() + ns *corev1.Namespace + now metav1.Time + ) + BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "tfjob-ns-", + }, + } + now = metav1.Now() + Expect(testK8sClient.Create(ctx, ns)).Should(Succeed()) + }) + AfterEach(func() { + Expect(testK8sClient.Delete(ctx, ns)).Should(Succeed()) + }) + type cleanUpCases struct { + tfJob *kubeflowv1.TFJob + runPolicy *kubeflowv1.RunPolicy + jobStatus kubeflowv1.JobStatus + wantTFJobIsRemoved bool + wantErr bool + } + DescribeTable("TFJob is created and is cleaned up", + func(tc *cleanUpCases) { + tc.tfJob.SetNamespace(ns.Name) + Expect(testK8sClient.Create(ctx, tc.tfJob)).Should(Succeed()) + + if tc.wantErr { + Expect(reconciler.CleanupJob(tc.runPolicy, tc.jobStatus, tc.tfJob)).ShouldNot(Succeed()) + } else { + Expect(reconciler.CleanupJob(tc.runPolicy, tc.jobStatus, tc.tfJob)).Should(Succeed()) + } + if tc.wantTFJobIsRemoved { + Eventually(func() bool { + gotErr := testK8sClient.Get(ctx, client.ObjectKeyFromObject(tc.tfJob), &kubeflowv1.TFJob{}) + return errors.IsNotFound(gotErr) + }).Should(BeTrue()) + } else { + Eventually(func() error { + return testK8sClient.Get(ctx, client.ObjectKeyFromObject(tc.tfJob), &kubeflowv1.TFJob{}) + }).Should(BeNil()) + } + }, + Entry("TFJob shouldn't be removed since TTL is nil", &cleanUpCases{ + tfJob: testutil.NewTFJobWithCleanupJobDelay(1, 2, 0, nil), + runPolicy: &kubeflowv1.RunPolicy{ + TTLSecondsAfterFinished: nil, + }, + jobStatus: kubeflowv1.JobStatus{}, + wantTFJobIsRemoved: false, + wantErr: false, + }), + Entry("Error is occurred since completionTime is nil", &cleanUpCases{ + tfJob: testutil.NewTFJobWithCleanupJobDelay(1, 2, 0, pointer.Int32(10)), + runPolicy: &kubeflowv1.RunPolicy{ + TTLSecondsAfterFinished: pointer.Int32(10), + }, + jobStatus: kubeflowv1.JobStatus{ + CompletionTime: nil, + }, + wantTFJobIsRemoved: false, + wantErr: true, + }), + Entry("TFJob is removed since exceeded TTL (TTL is 180s)", &cleanUpCases{ + tfJob: testutil.NewTFJobWithCleanupJobDelay(1, 2, 0, pointer.Int32(180)), + runPolicy: &kubeflowv1.RunPolicy{ + TTLSecondsAfterFinished: pointer.Int32(180), + }, + jobStatus: kubeflowv1.JobStatus{ + CompletionTime: &metav1.Time{ + Time: now.AddDate(0, 0, -1), + }, + }, + wantTFJobIsRemoved: true, + wantErr: false, + }), + Entry("TFJob is removed since (TTL is 0s)", &cleanUpCases{ + tfJob: testutil.NewTFJobWithCleanupJobDelay(1, 2, 0, pointer.Int32(0)), + runPolicy: &kubeflowv1.RunPolicy{ + TTLSecondsAfterFinished: pointer.Int32(0), + }, + jobStatus: kubeflowv1.JobStatus{ + CompletionTime: &now, + }, + wantTFJobIsRemoved: true, + wantErr: false, + }), + ) + + type createServiceCases struct { + tfJob *kubeflowv1.TFJob + rType kubeflowv1.ReplicaType + spec *kubeflowv1.ReplicaSpec + uid types.UID + index int + wantErr bool + } + DescribeTable("CreateNewService", + func(tc *createServiceCases) { + tc.tfJob.SetUID(tc.uid) + tc.tfJob.SetNamespace(ns.Name) + + gotErr := reconciler.CreateNewService(tc.tfJob, tc.rType, tc.spec, strconv.Itoa(tc.index)) + if tc.wantErr { + Expect(gotErr).ShouldNot(Succeed()) + } else { + Expect(gotErr).Should(Succeed()) + + svcInternalTPC := corev1.ServiceInternalTrafficPolicyCluster + svcSingleStack := corev1.IPFamilyPolicySingleStack + wantSvc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s-%d", tc.tfJob.Name, tc.rType, tc.index), + Namespace: ns.Name, + OwnerReferences: []metav1.OwnerReference{ + *reconciler.GenOwnerReference(tc.tfJob), + }, + Labels: map[string]string{ + kubeflowv1.JobNameLabel: tc.tfJob.Name, + kubeflowv1.OperatorNameLabel: controllerName, + kubeflowv1.ReplicaIndexLabel: strconv.Itoa(tc.index), + kubeflowv1.ReplicaTypeLabel: "", + }, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: kubeflowv1.TFJobDefaultPortName, + Protocol: corev1.ProtocolTCP, + Port: kubeflowv1.TFJobDefaultPort, + TargetPort: intstr.IntOrString{ + IntVal: kubeflowv1.TFJobDefaultPort, + }, + }, + }, + Selector: map[string]string{ + kubeflowv1.JobNameLabel: tc.tfJob.Name, + kubeflowv1.OperatorNameLabel: controllerName, + kubeflowv1.ReplicaIndexLabel: strconv.Itoa(tc.index), + kubeflowv1.ReplicaTypeLabel: "", + }, + ClusterIP: corev1.ClusterIPNone, + Type: corev1.ServiceTypeClusterIP, + ClusterIPs: []string{corev1.ClusterIPNone}, + SessionAffinity: corev1.ClusterIPNone, + IPFamilies: []corev1.IPFamily{corev1.IPv4Protocol}, + IPFamilyPolicy: &svcSingleStack, + InternalTrafficPolicy: &svcInternalTPC, + }, + } + Eventually(func() *corev1.Service { + svc := &corev1.Service{} + Expect(testK8sClient.Get(ctx, client.ObjectKeyFromObject(wantSvc), svc)).Should(Succeed()) + return svc + }).Should(BeComparableTo(wantSvc, + cmpopts.IgnoreFields(metav1.ObjectMeta{}, "UID", "ResourceVersion", "Generation", "CreationTimestamp", "ManagedFields"))) + } + }, + Entry("Failed to create service since containerPort is missing", &createServiceCases{ + tfJob: testutil.NewTFJobV2(2, 0, 0, 1, 0), + spec: &kubeflowv1.ReplicaSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: kubeflowv1.TFJobDefaultContainerName, + }, + }, + }, + }, + }, + index: 0, + wantErr: true, + }), + Entry("Failed to create service since Job's ownerReference is invalid", &createServiceCases{ + tfJob: testutil.NewTFJobV2(2, 0, 0, 1, 0), + spec: &kubeflowv1.ReplicaSpec{Template: testutil.NewTFReplicaSpecTemplate()}, + index: 1, + wantErr: true, + }), + Entry("Succeeded to create service", &createServiceCases{ + tfJob: testutil.NewTFJobV2(2, 0, 0, 1, 0), + spec: &kubeflowv1.ReplicaSpec{Template: testutil.NewTFReplicaSpecTemplate()}, + index: 0, + wantErr: false, + uid: uuid.NewUUID(), + }), + ) }) diff --git a/pkg/controller.v1/tensorflow/tfjob_controller_test.go b/pkg/controller.v1/tensorflow/tfjob_controller_test.go index cdfba2efcb..f1ab3d0c12 100644 --- a/pkg/controller.v1/tensorflow/tfjob_controller_test.go +++ b/pkg/controller.v1/tensorflow/tfjob_controller_test.go @@ -17,8 +17,6 @@ package tensorflow import ( "context" "fmt" - - commonv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" @@ -26,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "sigs.k8s.io/controller-runtime/pkg/client" + commonv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "github.com/kubeflow/training-operator/pkg/common/util/v1/testutil" ) diff --git a/test_job/controller.v1/test_job/test_job_controller.go b/test_job/controller.v1/test_job/test_job_controller.go deleted file mode 100644 index 17e1c289e0..0000000000 --- a/test_job/controller.v1/test_job/test_job_controller.go +++ /dev/null @@ -1,75 +0,0 @@ -package test_job - -import ( - commonv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - "github.com/kubeflow/training-operator/pkg/common" - v1 "github.com/kubeflow/training-operator/test_job/apis/test_job/v1" - log "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" -) - -var _ common.ControllerInterface = &TestJobController{} - -type TestJobController struct { - common.ControllerInterface - Job *v1.TestJob - Pods []*corev1.Pod - Services []*corev1.Service -} - -func (TestJobController) ControllerName() string { - return "test-operator" -} - -func (TestJobController) GetAPIGroupVersionKind() schema.GroupVersionKind { - return v1.SchemeGroupVersionKind -} - -func (TestJobController) GetAPIGroupVersion() schema.GroupVersion { - return v1.SchemeGroupVersion -} - -func (TestJobController) GetGroupNameLabelValue() string { - return v1.GroupName -} - -func (TestJobController) GetDefaultContainerPortName() string { - return "default-port-name" -} - -func (t *TestJobController) GetJobFromInformerCache(namespace, name string) (metav1.Object, error) { - return t.Job, nil -} - -func (t *TestJobController) GetJobFromAPIClient(namespace, name string) (metav1.Object, error) { - return t.Job, nil -} - -func (t *TestJobController) DeleteJob(job interface{}) error { - log.Info("Delete job") - t.Job = nil - return nil -} - -func (t *TestJobController) UpdateJobStatus(job interface{}, replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, - jobStatus *commonv1.JobStatus) error { - return nil -} - -func (t *TestJobController) UpdateJobStatusInApiServer(job interface{}, jobStatus *commonv1.JobStatus) error { - return nil -} - -func (t *TestJobController) SetClusterSpec(job interface{}, podTemplate *corev1.PodTemplateSpec, rtype, index string) error { - return nil -} - -func (t *TestJobController) GetDefaultContainerName() string { - return "default-container" -} - -func (t *TestJobController) IsMasterRole(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, rtype commonv1.ReplicaType, index int) bool { - return true -}