From 4dd0d0909210edc7e1a31a5987cb32dc834d682b Mon Sep 17 00:00:00 2001 From: Yuki Iwai Date: Wed, 2 Aug 2023 10:24:39 +0900 Subject: [PATCH] Implement integration test for MPIJob v1 related to suspend semantics (#1875) Signed-off-by: Yuki Iwai --- pkg/controller.v1/mpi/mpijob_controller.go | 2 +- .../mpi/mpijob_controller_test.go | 259 ++++++++++++++++++ .../xgboost/xgboostjob_controller_test.go | 2 +- 3 files changed, 261 insertions(+), 2 deletions(-) diff --git a/pkg/controller.v1/mpi/mpijob_controller.go b/pkg/controller.v1/mpi/mpijob_controller.go index 049c9ae55a..43834b25ab 100644 --- a/pkg/controller.v1/mpi/mpijob_controller.go +++ b/pkg/controller.v1/mpi/mpijob_controller.go @@ -317,7 +317,7 @@ func (jc *MPIJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { } jc.Scheme.Default(mpiJob) - msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, e.Object.GetName()) + msg := fmt.Sprintf("MPIJob %s is created.", e.Object.GetName()) logrus.Info(msg) trainingoperatorcommon.CreatedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName()) commonutil.UpdateJobConditions(&mpiJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobCreatedReason), msg) diff --git a/pkg/controller.v1/mpi/mpijob_controller_test.go b/pkg/controller.v1/mpi/mpijob_controller_test.go index 78d3c1bdac..295f244f3c 100644 --- a/pkg/controller.v1/mpi/mpijob_controller_test.go +++ b/pkg/controller.v1/mpi/mpijob_controller_test.go @@ -23,13 +23,16 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/pointer" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + commonutil "github.com/kubeflow/training-operator/pkg/util" "github.com/kubeflow/training-operator/pkg/util/testutil" ) @@ -787,6 +790,262 @@ var _ = Describe("MPIJob controller", func() { } }) }) + + Context("When creating the MPIJob with the suspend semantics", func() { + const name = "test-job" + var ( + ns *corev1.Namespace + job *kubeflowv1.MPIJob + jobKey types.NamespacedName + launcherKey types.NamespacedName + worker0Key types.NamespacedName + ctx = context.Background() + ) + BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "mpijob-test-", + }, + } + Expect(testK8sClient.Create(ctx, ns)).Should(Succeed()) + + now := metav1.Now() + job = newMPIJob(name, pointer.Int32(1), 1, gpuResourceName, &now, &now) + job.Namespace = ns.Name + jobKey = client.ObjectKeyFromObject(job) + launcherKey = types.NamespacedName{ + Name: fmt.Sprintf("%s-launcher", name), + Namespace: ns.Name, + } + worker0Key = types.NamespacedName{ + Name: fmt.Sprintf("%s-worker-0", name), + Namespace: ns.Name, + } + }) + AfterEach(func() { + Expect(testK8sClient.Delete(ctx, job)).Should(Succeed()) + Expect(testK8sClient.Delete(ctx, ns)).Should(Succeed()) + }) + It("Shouldn't create resources if MPIJob is suspended", func() { + By("By creating a new MPIJob with suspend=true") + job.Spec.RunPolicy.Suspend = pointer.Bool(true) + Expect(testK8sClient.Create(ctx, job)).Should(Succeed()) + + created := &kubeflowv1.MPIJob{} + launcherPod := &corev1.Pod{} + workerPod := &corev1.Pod{} + + By("Checking created MPIJob") + Eventually(func() bool { + err := testK8sClient.Get(ctx, jobKey, created) + return err == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + By("Checking created MPIJob has a nil startTime") + Consistently(func() *metav1.Time { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.StartTime + }, testutil.ConsistentDuration, testutil.Interval).Should(BeNil()) + + By("Checking if the pods aren't created") + Consistently(func() bool { + errLauncherPod := testK8sClient.Get(ctx, launcherKey, launcherPod) + errWorkerPod := testK8sClient.Get(ctx, worker0Key, workerPod) + return errors.IsNotFound(errLauncherPod) && errors.IsNotFound(errWorkerPod) + }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue()) + + By("Checking if the MPIJob has suspended condition") + Eventually(func() []kubeflowv1.JobCondition { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.Conditions + }, testutil.ConsistentDuration, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition{ + { + Type: kubeflowv1.JobCreated, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobCreatedReason), + Message: fmt.Sprintf("MPIJob %s is created.", name), + }, + { + Type: kubeflowv1.JobSuspended, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSuspendedReason), + Message: fmt.Sprintf("MPIJob %s is suspended.", name), + }, + }, testutil.IgnoreJobConditionsTimes)) + }) + + It("Should delete resources after MPIJob is suspended; Should resume MPIJob after MPIJob is unsuspended", func() { + By("By creating a new MPIJob") + Expect(testK8sClient.Create(ctx, job)).Should(Succeed()) + + created := &kubeflowv1.MPIJob{} + launcherPod := &corev1.Pod{} + workerPod := &corev1.Pod{} + + // We'll need to retry getting this newly created MPIJob, given that creation may not immediately happen. + By("Checking created MPIJob") + Eventually(func() bool { + err := testK8sClient.Get(ctx, jobKey, created) + return err == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + + var startTimeBeforeSuspended *metav1.Time + Eventually(func() *metav1.Time { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + startTimeBeforeSuspended = created.Status.StartTime + return startTimeBeforeSuspended + }, testutil.Timeout, testutil.Interval).ShouldNot(BeNil()) + + By("Checking the created pods") + Eventually(func() bool { + errLauncher := testK8sClient.Get(ctx, launcherKey, launcherPod) + errWorker := testK8sClient.Get(ctx, worker0Key, workerPod) + return errLauncher == nil && errWorker == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + + By("Updating the Pod's phase with Running") + Eventually(func() error { + Expect(testK8sClient.Get(ctx, launcherKey, launcherPod)).Should(Succeed()) + launcherPod.Status.Phase = corev1.PodRunning + return testK8sClient.Status().Update(ctx, launcherPod) + }, testutil.Timeout, testutil.Interval).Should(Succeed()) + Eventually(func() error { + Expect(testK8sClient.Get(ctx, worker0Key, workerPod)).Should(Succeed()) + workerPod.Status.Phase = corev1.PodRunning + return testK8sClient.Status().Update(ctx, workerPod) + }, testutil.Timeout, testutil.Interval).Should(Succeed()) + + By("Checking the MPIJob's condition") + Eventually(func() []kubeflowv1.JobCondition { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.Conditions + }, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition{ + { + Type: kubeflowv1.JobCreated, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobCreatedReason), + Message: fmt.Sprintf("MPIJob %s is created.", name), + }, + { + Type: kubeflowv1.JobRunning, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRunningReason), + Message: fmt.Sprintf("MPIJob %s is running.", name), + }, + }, testutil.IgnoreJobConditionsTimes)) + + By("Updating the MPIJob with suspend=true") + Eventually(func() error { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + created.Spec.RunPolicy.Suspend = pointer.Bool(true) + return testK8sClient.Update(ctx, created) + }, testutil.Timeout, testutil.Interval).Should(Succeed()) + + By("Checking if the pods are removed") + Eventually(func() bool { + errLauncher := testK8sClient.Get(ctx, launcherKey, launcherPod) + errWorker := testK8sClient.Get(ctx, worker0Key, workerPod) + return errors.IsNotFound(errLauncher) && errors.IsNotFound(errWorker) + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + Consistently(func() bool { + errLauncherPod := testK8sClient.Get(ctx, launcherKey, launcherPod) + errWorkerPod := testK8sClient.Get(ctx, worker0Key, workerPod) + return errors.IsNotFound(errLauncherPod) && errors.IsNotFound(errWorkerPod) + }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue()) + + By("Checking if the MPIJob has a suspended condition") + Eventually(func() bool { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.ReplicaStatuses[kubeflowv1.MPIJobReplicaTypeLauncher].Active == 0 && + created.Status.ReplicaStatuses[kubeflowv1.MPIJobReplicaTypeWorker].Active == 0 && + created.Status.StartTime.Equal(startTimeBeforeSuspended) + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + Consistently(func() bool { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.ReplicaStatuses[kubeflowv1.MPIJobReplicaTypeLauncher].Active == 0 && + created.Status.ReplicaStatuses[kubeflowv1.MPIJobReplicaTypeWorker].Active == 0 && + created.Status.StartTime.Equal(startTimeBeforeSuspended) + }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue()) + Expect(created.Status.Conditions).Should(BeComparableTo([]kubeflowv1.JobCondition{ + { + Type: kubeflowv1.JobCreated, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobCreatedReason), + Message: fmt.Sprintf("MPIJob %s is created.", name), + }, + { + Type: kubeflowv1.JobRunning, + Status: corev1.ConditionFalse, + Reason: commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSuspendedReason), + Message: fmt.Sprintf("MPIJob %s is suspended.", name), + }, + { + Type: kubeflowv1.JobSuspended, + Reason: commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSuspendedReason), + Message: fmt.Sprintf("MPIJob %s is suspended.", name), + Status: corev1.ConditionTrue, + }, + }, testutil.IgnoreJobConditionsTimes)) + + By("Unsuspending the MPIJob") + Eventually(func() error { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + created.Spec.RunPolicy.Suspend = pointer.Bool(false) + return testK8sClient.Update(ctx, created) + }, testutil.Timeout, testutil.Interval).Should(Succeed()) + Eventually(func() *metav1.Time { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.StartTime + }, testutil.Timeout, testutil.Interval).ShouldNot(BeNil()) + + By("Check if the pods are created") + Eventually(func() error { + return testK8sClient.Get(ctx, launcherKey, launcherPod) + }, testutil.Timeout, testutil.Interval).Should(BeNil()) + Eventually(func() error { + return testK8sClient.Get(ctx, worker0Key, workerPod) + }, testutil.Timeout, testutil.Interval).Should(BeNil()) + + By("Updating Pod's condition with Running") + Eventually(func() error { + Expect(testK8sClient.Get(ctx, launcherKey, launcherPod)).Should(Succeed()) + launcherPod.Status.Phase = corev1.PodRunning + return testK8sClient.Status().Update(ctx, launcherPod) + }, testutil.Timeout, testutil.Interval).Should(Succeed()) + Eventually(func() error { + Expect(testK8sClient.Get(ctx, worker0Key, workerPod)).Should(Succeed()) + workerPod.Status.Phase = corev1.PodRunning + return testK8sClient.Status().Update(ctx, workerPod) + }, testutil.Timeout, testutil.Interval).Should(Succeed()) + + By("Checking if the MPIJob has resumed conditions") + Eventually(func() []kubeflowv1.JobCondition { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.Conditions + }, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition{ + { + Type: kubeflowv1.JobCreated, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobCreatedReason), + Message: fmt.Sprintf("MPIJob %s is created.", name), + }, + { + Type: kubeflowv1.JobSuspended, + Reason: commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobResumedReason), + Message: fmt.Sprintf("MPIJob %s is resumed.", name), + Status: corev1.ConditionFalse, + }, + { + Type: kubeflowv1.JobRunning, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRunningReason), + Message: fmt.Sprintf("MPIJob %s is running.", name), + }, + }, testutil.IgnoreJobConditionsTimes)) + + By("Checking if the startTime is updated") + Expect(created.Status.StartTime).ShouldNot(Equal(startTimeBeforeSuspended)) + }) + }) }) func ReplicaStatusMatch(replicaStatuses map[common.ReplicaType]*common.ReplicaStatus, diff --git a/pkg/controller.v1/xgboost/xgboostjob_controller_test.go b/pkg/controller.v1/xgboost/xgboostjob_controller_test.go index 468af95e8a..e5894eb385 100644 --- a/pkg/controller.v1/xgboost/xgboostjob_controller_test.go +++ b/pkg/controller.v1/xgboost/xgboostjob_controller_test.go @@ -37,7 +37,7 @@ var _ = Describe("XGBoost controller", func() { const ( expectedPort = int32(9999) ) - Context("", func() { + Context("When creating the XGBoostJob", func() { const name = "test-job" var ( ns *corev1.Namespace