Skip to content

Commit

Permalink
Merge pull request #382 from bjwswang/gql
Browse files Browse the repository at this point in the history
fix: updatetimestamp refeshed unexpectedly
  • Loading branch information
bjwswang authored Dec 15, 2023
2 parents 7bd8df6 + bbec743 commit 511dddd
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 36 deletions.
29 changes: 27 additions & 2 deletions api/base/v1alpha1/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,32 @@ func (worker Worker) MakeRegistrationModelName() string {
}

func (worker Worker) PendingCondition() Condition {
currCon := worker.Status.GetCondition(TypeReady)
// return current condition if condition not changed
if currCon.Status == corev1.ConditionFalse && currCon.Reason == "Pending" {
return currCon
}
// keep original LastSuccessfulTime if have
lastSuccessfulTime := metav1.Now()
if currCon.LastSuccessfulTime.IsZero() {
lastSuccessfulTime = currCon.LastSuccessfulTime
}
return Condition{
Type: TypeReady,
Status: corev1.ConditionFalse,
Reason: "Pending",
Message: "Worker is pending",
LastTransitionTime: metav1.Now(),
LastSuccessfulTime: metav1.Now(),
LastSuccessfulTime: lastSuccessfulTime,
}
}

func (worker Worker) ReadyCondition() Condition {
currCon := worker.Status.GetCondition(TypeReady)
// return current condition if condition not changed
if currCon.Status == corev1.ConditionTrue && currCon.Reason == "Running" {
return currCon
}
return Condition{
Type: TypeReady,
Status: corev1.ConditionTrue,
Expand All @@ -90,13 +105,23 @@ func (worker Worker) ReadyCondition() Condition {
}

func (worker Worker) ErrorCondition(msg string) Condition {
currCon := worker.Status.GetCondition(TypeReady)
// return current condition if condition not changed
if currCon.Status == corev1.ConditionFalse && currCon.Reason == "Error" && currCon.Message == msg {
return currCon
}
// keep original LastSuccessfulTime if have
lastSuccessfulTime := metav1.Now()
if currCon.LastSuccessfulTime.IsZero() {
lastSuccessfulTime = currCon.LastSuccessfulTime
}
return Condition{
Type: TypeReady,
Status: corev1.ConditionFalse,
Reason: "Error",
Message: msg,
LastSuccessfulTime: lastSuccessfulTime,
LastTransitionTime: metav1.Now(),
LastSuccessfulTime: metav1.Now(),
}
}

Expand Down
62 changes: 38 additions & 24 deletions controllers/worker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package controllers

import (
"context"
"fmt"
"reflect"

"github.com/go-logr/logr"
"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -80,53 +80,53 @@ func (r *WorkerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
log.V(1).Info("Failed to get Worker")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
log.V(5).Info("Get Worker instance")

// Add a finalizer.Then, we can define some operations which should
// occur before the Worker to be deleted.
// More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/finalizers
if newAdded := controllerutil.AddFinalizer(worker, arcadiav1alpha1.Finalizer); newAdded {
log.Info("Try to add Finalizer for Worker")
log.V(5).Info("Try to add Finalizer for Worker")
if err = r.Update(ctx, worker); err != nil {
log.Error(err, "Failed to update Worker to add finalizer, will try again later")
return ctrl.Result{Requeue: true}, err
log.V(1).Info("Failed to update Worker to add finalizer")
return ctrl.Result{}, err
}
log.Info("Adding Finalizer for Worker done")
log.V(5).Info("Adding Finalizer for Worker done")
return ctrl.Result{Requeue: true}, nil
}

// Check if the Worker instance is marked to be deleted, which is
// indicated by the deletion timestamp being set.
if worker.GetDeletionTimestamp() != nil && controllerutil.ContainsFinalizer(worker, arcadiav1alpha1.Finalizer) {
log.Info("Performing Finalizer Operations for Worker before delete CR")
log.V(5).Info("Performing Finalizer Operations for Worker before delete CR")
// TODO perform the finalizer operations here, for example: remove vectorstore data?
log.Info("Removing Finalizer for Worker after successfully performing the operations")
log.V(5).Info("Removing Finalizer for Worker after successfully performing the operations")
controllerutil.RemoveFinalizer(worker, arcadiav1alpha1.Finalizer)
if err = r.Update(ctx, worker); err != nil {
log.Error(err, "Failed to remove finalizer for Worker")
log.V(1).Info("Failed to remove finalizer for Worker")
return ctrl.Result{}, err
}
log.Info("Remove Worker done")
log.V(1).Info("Remove Worker done")
return ctrl.Result{}, nil
}

// initialize labels
requeue, err := r.Initialize(ctx, log, worker)
if err != nil {
log.Error(err, "unable to update labels after generation update")
return ctrl.Result{Requeue: true}, err
log.V(1).Info("Failed to update labels")
return ctrl.Result{}, err
}
if requeue {
return ctrl.Result{Requeue: true}, nil
}

// core reconcile logic
if err := r.reconcile(ctx, log, worker); err != nil {
err = r.reconcile(ctx, log, worker)
if err != nil {
log.Error(err, "Failed to reconcile worker")
r.setCondition(worker, worker.ErrorCondition(err.Error()))
}

if updateStatusErr := r.patchStatus(ctx, worker); updateStatusErr != nil {
log.Error(updateStatusErr, "unable to update status after generation update")
log.Error(updateStatusErr, "Failed to patch worker status")
return ctrl.Result{Requeue: true}, updateStatusErr
}

Expand Down Expand Up @@ -161,24 +161,25 @@ func (r *WorkerReconciler) reconcile(ctx context.Context, logger logr.Logger, wo
// reconcile worker instance
system, err := config.GetSystemDatasource(ctx, r.Client, nil)
if err != nil {
return fmt.Errorf("failed to get system datasource with %w", err)
return errors.Wrap(err, "Failed to get system datasource")
}
w, err := arcadiaworker.NewPodWorker(ctx, r.Client, r.Scheme, worker, system)
if err != nil {
return fmt.Errorf("failed to new a pod worker with %w", err)
return errors.Wrap(err, "Failed to new a pod worker")
}

logger.V(5).Info("BeforeStart worker")
if err := w.BeforeStart(ctx); err != nil {
return fmt.Errorf("failed to do BeforeStart: %w", err)
return errors.Wrap(err, "Failed to do BeforeStart")
}

logger.V(5).Info("Start worker")
if err := w.Start(ctx); err != nil {
return fmt.Errorf("failed to do Start: %w", err)
return errors.Wrap(err, "Failed to do Start")
}

logger.V(5).Info("State worker")
status, err := w.State(ctx)
if err != nil {
return fmt.Errorf("failed to do State: %w", err)
return errors.Wrap(err, "Failed to do State")
}

// check & patch state
Expand All @@ -197,7 +198,7 @@ func (r *WorkerReconciler) reconcile(ctx context.Context, logger logr.Logger, wo
// further reconcile when worker is ready
if worker.Status.IsReady() {
if err := r.reconcileWhenWorkerReady(ctx, logger, worker, w.Model()); err != nil {
return fmt.Errorf("failed to reconcileWhenWorkerReady: %w", err)
return errors.Wrap(err, "Failed to do further reconcile when worker is ready")
}
}

Expand Down Expand Up @@ -266,6 +267,10 @@ func (r *WorkerReconciler) patchStatus(ctx context.Context, worker *arcadiav1alp
if err := r.Client.Get(ctx, client.ObjectKeyFromObject(worker), latest); err != nil {
return err
}
if reflect.DeepEqual(worker.Status, latest.Status) {
return nil
}

patch := client.MergeFrom(latest.DeepCopy())
latest.Status = worker.Status
return r.Client.Status().Patch(ctx, latest, patch, client.FieldOwner("worker-controller"))
Expand All @@ -278,12 +283,21 @@ func (r *WorkerReconciler) SetupWithManager(mgr ctrl.Manager) error {
UpdateFunc: func(ue event.UpdateEvent) bool {
oldWorker := ue.ObjectOld.(*arcadiav1alpha1.Worker)
newWorker := ue.ObjectNew.(*arcadiav1alpha1.Worker)

return !reflect.DeepEqual(oldWorker.Spec, newWorker.Spec) || newWorker.DeletionTimestamp != nil
},
})).
Watches(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &arcadiav1alpha1.Worker{},
}).
}, builder.WithPredicates(
predicate.Funcs{
UpdateFunc: func(ue event.UpdateEvent) bool {
oldDep := ue.ObjectOld.(*appsv1.Deployment)
newDep := ue.ObjectNew.(*appsv1.Deployment)
return !reflect.DeepEqual(oldDep.Status, newDep.Status)
},
},
)).
Complete(r)
}
35 changes: 25 additions & 10 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ package worker

import (
"context"
"errors"
"fmt"

"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -169,6 +169,7 @@ func NewPodWorker(ctx context.Context, c client.Client, s *runtime.Scheme, w *ar
"app.kubernetes.io/name": worker.SuffixedName(),
},
},
Strategy: appsv1.DeploymentStrategy{Type: appsv1.RecreateDeploymentStrategyType},
},
}

Expand Down Expand Up @@ -300,8 +301,8 @@ func (worker *PodWorker) Start(ctx context.Context) error {
conRunner, _ := runner.(*corev1.Container)

// initialize deployment
deployment := worker.deployment.DeepCopy()
deployment.Spec.Template = corev1.PodTemplateSpec{
desiredDep := worker.deployment.DeepCopy()
desiredDep.Spec.Template = corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app.kubernetes.io/name": worker.SuffixedName(),
Expand All @@ -314,23 +315,26 @@ func (worker *PodWorker) Start(ctx context.Context) error {
Volumes: []corev1.Volume{worker.storage},
},
}
err = controllerutil.SetControllerReference(worker.w, deployment, worker.s)
err = controllerutil.SetControllerReference(worker.w, desiredDep, worker.s)
if err != nil {
return fmt.Errorf("failed to set owner reference with %w", err)
}

currDeployment := &appsv1.Deployment{}
err = worker.c.Get(ctx, types.NamespacedName{Namespace: deployment.Namespace, Name: deployment.Name}, currDeployment)
currDep := &appsv1.Deployment{}
err = worker.c.Get(ctx, types.NamespacedName{Namespace: desiredDep.Namespace, Name: desiredDep.Name}, currDep)
switch ActionOnError(err) {
case Panic:
return err
case Update:
err = worker.c.Update(ctx, deployment)
merged := MakeMergedDeployment(currDep, desiredDep)
// Update only when spec changed
err = worker.c.Patch(ctx, merged, client.MergeFrom(currDep))
if err != nil {
return fmt.Errorf("failed to create deployment with %w", err)
return errors.Wrap(err, "Failed to update worker")
}

case Create:
err = worker.c.Create(ctx, deployment)
err = worker.c.Create(ctx, desiredDep)
if err != nil {
return fmt.Errorf("failed to create deployment with %w", err)
}
Expand All @@ -339,6 +343,14 @@ func (worker *PodWorker) Start(ctx context.Context) error {
return nil
}

func MakeMergedDeployment(target *appsv1.Deployment, desired *appsv1.Deployment) *appsv1.Deployment {
merged := target.DeepCopy()

// merge this deployment with desired
merged.Spec.Template.Spec = desired.Spec.Template.Spec
return merged
}

// TODO: BeforeStop
func (worker *PodWorker) BeforeStop(ctx context.Context) error {
return nil
Expand All @@ -363,7 +375,10 @@ func (worker *PodWorker) State(ctx context.Context) (any, error) {
}

if len(podList.Items) != 1 {
return nil, fmt.Errorf("expected 1 but got %d worker pods", len(podList.Items))
return &corev1.PodStatus{
Phase: corev1.PodUnknown,
Message: fmt.Sprintf("Expected one pod but got %d", len(podList.Items)),
}, nil
}

return &podList.Items[0].Status, nil
Expand Down

0 comments on commit 511dddd

Please sign in to comment.