From bbec743d6b667def94305c81c8493daac3118556 Mon Sep 17 00:00:00 2001 From: bjwswang Date: Fri, 15 Dec 2023 07:36:43 +0000 Subject: [PATCH] fix: worker's updatetime refreshed unexpectedly Signed-off-by: bjwswang --- api/base/v1alpha1/worker.go | 29 +++++++++++++-- controllers/worker_controller.go | 62 +++++++++++++++++++------------- pkg/worker/worker.go | 35 ++++++++++++------ 3 files changed, 90 insertions(+), 36 deletions(-) diff --git a/api/base/v1alpha1/worker.go b/api/base/v1alpha1/worker.go index 4e4f52d74..d91d1fbfe 100644 --- a/api/base/v1alpha1/worker.go +++ b/api/base/v1alpha1/worker.go @@ -68,17 +68,32 @@ func (worker Worker) MakeRegistrationModelName() string { } func (worker Worker) PendingCondition() Condition { + currCon := worker.Status.GetCondition(TypeReady) + // return current condition if condition not changed + if currCon.Status == corev1.ConditionFalse && currCon.Reason == "Pending" { + return currCon + } + // keep original LastSuccessfulTime if have + lastSuccessfulTime := metav1.Now() + if currCon.LastSuccessfulTime.IsZero() { + lastSuccessfulTime = currCon.LastSuccessfulTime + } return Condition{ Type: TypeReady, Status: corev1.ConditionFalse, Reason: "Pending", Message: "Worker is pending", LastTransitionTime: metav1.Now(), - LastSuccessfulTime: metav1.Now(), + LastSuccessfulTime: lastSuccessfulTime, } } func (worker Worker) ReadyCondition() Condition { + currCon := worker.Status.GetCondition(TypeReady) + // return current condition if condition not changed + if currCon.Status == corev1.ConditionTrue && currCon.Reason == "Running" { + return currCon + } return Condition{ Type: TypeReady, Status: corev1.ConditionTrue, @@ -90,13 +105,23 @@ func (worker Worker) ReadyCondition() Condition { } func (worker Worker) ErrorCondition(msg string) Condition { + currCon := worker.Status.GetCondition(TypeReady) + // return current condition if condition not changed + if currCon.Status == corev1.ConditionFalse && currCon.Reason == "Error" && currCon.Message == msg { + return currCon + } + // keep original LastSuccessfulTime if have + lastSuccessfulTime := metav1.Now() + if currCon.LastSuccessfulTime.IsZero() { + lastSuccessfulTime = currCon.LastSuccessfulTime + } return Condition{ Type: TypeReady, Status: corev1.ConditionFalse, Reason: "Error", Message: msg, + LastSuccessfulTime: lastSuccessfulTime, LastTransitionTime: metav1.Now(), - LastSuccessfulTime: metav1.Now(), } } diff --git a/controllers/worker_controller.go b/controllers/worker_controller.go index 881298aee..e456e882c 100644 --- a/controllers/worker_controller.go +++ b/controllers/worker_controller.go @@ -18,10 +18,10 @@ package controllers import ( "context" - "fmt" "reflect" "github.com/go-logr/logr" + "github.com/pkg/errors" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -80,53 +80,53 @@ func (r *WorkerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res log.V(1).Info("Failed to get Worker") return ctrl.Result{}, client.IgnoreNotFound(err) } - log.V(5).Info("Get Worker instance") // Add a finalizer.Then, we can define some operations which should // occur before the Worker to be deleted. // More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/finalizers if newAdded := controllerutil.AddFinalizer(worker, arcadiav1alpha1.Finalizer); newAdded { - log.Info("Try to add Finalizer for Worker") + log.V(5).Info("Try to add Finalizer for Worker") if err = r.Update(ctx, worker); err != nil { - log.Error(err, "Failed to update Worker to add finalizer, will try again later") - return ctrl.Result{Requeue: true}, err + log.V(1).Info("Failed to update Worker to add finalizer") + return ctrl.Result{}, err } - log.Info("Adding Finalizer for Worker done") + log.V(5).Info("Adding Finalizer for Worker done") return ctrl.Result{Requeue: true}, nil } // Check if the Worker instance is marked to be deleted, which is // indicated by the deletion timestamp being set. if worker.GetDeletionTimestamp() != nil && controllerutil.ContainsFinalizer(worker, arcadiav1alpha1.Finalizer) { - log.Info("Performing Finalizer Operations for Worker before delete CR") + log.V(5).Info("Performing Finalizer Operations for Worker before delete CR") // TODO perform the finalizer operations here, for example: remove vectorstore data? - log.Info("Removing Finalizer for Worker after successfully performing the operations") + log.V(5).Info("Removing Finalizer for Worker after successfully performing the operations") controllerutil.RemoveFinalizer(worker, arcadiav1alpha1.Finalizer) if err = r.Update(ctx, worker); err != nil { - log.Error(err, "Failed to remove finalizer for Worker") + log.V(1).Info("Failed to remove finalizer for Worker") return ctrl.Result{}, err } - log.Info("Remove Worker done") + log.V(1).Info("Remove Worker done") return ctrl.Result{}, nil } // initialize labels requeue, err := r.Initialize(ctx, log, worker) if err != nil { - log.Error(err, "unable to update labels after generation update") - return ctrl.Result{Requeue: true}, err + log.V(1).Info("Failed to update labels") + return ctrl.Result{}, err } if requeue { return ctrl.Result{Requeue: true}, nil } - // core reconcile logic - if err := r.reconcile(ctx, log, worker); err != nil { + err = r.reconcile(ctx, log, worker) + if err != nil { + log.Error(err, "Failed to reconcile worker") r.setCondition(worker, worker.ErrorCondition(err.Error())) } if updateStatusErr := r.patchStatus(ctx, worker); updateStatusErr != nil { - log.Error(updateStatusErr, "unable to update status after generation update") + log.Error(updateStatusErr, "Failed to patch worker status") return ctrl.Result{Requeue: true}, updateStatusErr } @@ -161,24 +161,25 @@ func (r *WorkerReconciler) reconcile(ctx context.Context, logger logr.Logger, wo // reconcile worker instance system, err := config.GetSystemDatasource(ctx, r.Client, nil) if err != nil { - return fmt.Errorf("failed to get system datasource with %w", err) + return errors.Wrap(err, "Failed to get system datasource") } w, err := arcadiaworker.NewPodWorker(ctx, r.Client, r.Scheme, worker, system) if err != nil { - return fmt.Errorf("failed to new a pod worker with %w", err) + return errors.Wrap(err, "Failed to new a pod worker") } + logger.V(5).Info("BeforeStart worker") if err := w.BeforeStart(ctx); err != nil { - return fmt.Errorf("failed to do BeforeStart: %w", err) + return errors.Wrap(err, "Failed to do BeforeStart") } - + logger.V(5).Info("Start worker") if err := w.Start(ctx); err != nil { - return fmt.Errorf("failed to do Start: %w", err) + return errors.Wrap(err, "Failed to do Start") } - + logger.V(5).Info("State worker") status, err := w.State(ctx) if err != nil { - return fmt.Errorf("failed to do State: %w", err) + return errors.Wrap(err, "Failed to do State") } // check & patch state @@ -197,7 +198,7 @@ func (r *WorkerReconciler) reconcile(ctx context.Context, logger logr.Logger, wo // further reconcile when worker is ready if worker.Status.IsReady() { if err := r.reconcileWhenWorkerReady(ctx, logger, worker, w.Model()); err != nil { - return fmt.Errorf("failed to reconcileWhenWorkerReady: %w", err) + return errors.Wrap(err, "Failed to do further reconcile when worker is ready") } } @@ -266,6 +267,10 @@ func (r *WorkerReconciler) patchStatus(ctx context.Context, worker *arcadiav1alp if err := r.Client.Get(ctx, client.ObjectKeyFromObject(worker), latest); err != nil { return err } + if reflect.DeepEqual(worker.Status, latest.Status) { + return nil + } + patch := client.MergeFrom(latest.DeepCopy()) latest.Status = worker.Status return r.Client.Status().Patch(ctx, latest, patch, client.FieldOwner("worker-controller")) @@ -278,12 +283,21 @@ func (r *WorkerReconciler) SetupWithManager(mgr ctrl.Manager) error { UpdateFunc: func(ue event.UpdateEvent) bool { oldWorker := ue.ObjectOld.(*arcadiav1alpha1.Worker) newWorker := ue.ObjectNew.(*arcadiav1alpha1.Worker) + return !reflect.DeepEqual(oldWorker.Spec, newWorker.Spec) || newWorker.DeletionTimestamp != nil }, })). Watches(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForOwner{ IsController: true, OwnerType: &arcadiav1alpha1.Worker{}, - }). + }, builder.WithPredicates( + predicate.Funcs{ + UpdateFunc: func(ue event.UpdateEvent) bool { + oldDep := ue.ObjectOld.(*appsv1.Deployment) + newDep := ue.ObjectNew.(*appsv1.Deployment) + return !reflect.DeepEqual(oldDep.Status, newDep.Status) + }, + }, + )). Complete(r) } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index ca0a8af3d..47addca5b 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -18,9 +18,9 @@ package worker import ( "context" - "errors" "fmt" + "github.com/pkg/errors" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -169,6 +169,7 @@ func NewPodWorker(ctx context.Context, c client.Client, s *runtime.Scheme, w *ar "app.kubernetes.io/name": worker.SuffixedName(), }, }, + Strategy: appsv1.DeploymentStrategy{Type: appsv1.RecreateDeploymentStrategyType}, }, } @@ -300,8 +301,8 @@ func (worker *PodWorker) Start(ctx context.Context) error { conRunner, _ := runner.(*corev1.Container) // initialize deployment - deployment := worker.deployment.DeepCopy() - deployment.Spec.Template = corev1.PodTemplateSpec{ + desiredDep := worker.deployment.DeepCopy() + desiredDep.Spec.Template = corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ "app.kubernetes.io/name": worker.SuffixedName(), @@ -314,23 +315,26 @@ func (worker *PodWorker) Start(ctx context.Context) error { Volumes: []corev1.Volume{worker.storage}, }, } - err = controllerutil.SetControllerReference(worker.w, deployment, worker.s) + err = controllerutil.SetControllerReference(worker.w, desiredDep, worker.s) if err != nil { return fmt.Errorf("failed to set owner reference with %w", err) } - currDeployment := &appsv1.Deployment{} - err = worker.c.Get(ctx, types.NamespacedName{Namespace: deployment.Namespace, Name: deployment.Name}, currDeployment) + currDep := &appsv1.Deployment{} + err = worker.c.Get(ctx, types.NamespacedName{Namespace: desiredDep.Namespace, Name: desiredDep.Name}, currDep) switch ActionOnError(err) { case Panic: return err case Update: - err = worker.c.Update(ctx, deployment) + merged := MakeMergedDeployment(currDep, desiredDep) + // Update only when spec changed + err = worker.c.Patch(ctx, merged, client.MergeFrom(currDep)) if err != nil { - return fmt.Errorf("failed to create deployment with %w", err) + return errors.Wrap(err, "Failed to update worker") } + case Create: - err = worker.c.Create(ctx, deployment) + err = worker.c.Create(ctx, desiredDep) if err != nil { return fmt.Errorf("failed to create deployment with %w", err) } @@ -339,6 +343,14 @@ func (worker *PodWorker) Start(ctx context.Context) error { return nil } +func MakeMergedDeployment(target *appsv1.Deployment, desired *appsv1.Deployment) *appsv1.Deployment { + merged := target.DeepCopy() + + // merge this deployment with desired + merged.Spec.Template.Spec = desired.Spec.Template.Spec + return merged +} + // TODO: BeforeStop func (worker *PodWorker) BeforeStop(ctx context.Context) error { return nil @@ -363,7 +375,10 @@ func (worker *PodWorker) State(ctx context.Context) (any, error) { } if len(podList.Items) != 1 { - return nil, fmt.Errorf("expected 1 but got %d worker pods", len(podList.Items)) + return &corev1.PodStatus{ + Phase: corev1.PodUnknown, + Message: fmt.Sprintf("Expected one pod but got %d", len(podList.Items)), + }, nil } return &podList.Items[0].Status, nil