diff --git a/api/model/v1alpha1/modeladapter_types.go b/api/model/v1alpha1/modeladapter_types.go index ebeca056..05d18d35 100644 --- a/api/model/v1alpha1/modeladapter_types.go +++ b/api/model/v1alpha1/modeladapter_types.go @@ -64,17 +64,21 @@ type ModelAdapterPhase string const ( // ModelAdapterPending means the CR has been created and that's the initial status ModelAdapterPending ModelAdapterPhase = "Pending" - // ModelAdapterScheduling means the ModelAdapter is pending scheduling - ModelAdapterScheduling ModelAdapterPhase = "Scheduling" - // ModelAdapterBinding means the controller loads ModelAdapter on a selected pod - ModelAdapterBinding ModelAdapterPhase = "Binding" + // ModelAdapterScheduled means the ModelAdapter is pending scheduling + ModelAdapterScheduled ModelAdapterPhase = "Scheduled" + // ModelAdapterBound means the controller loads ModelAdapter on a selected pod + ModelAdapterBound ModelAdapterPhase = "Bound" + // ModelAdapterResourceCreated means the model adapter owned resources have been created + ModelAdapterResourceCreated ModelAdapterPhase = "ResourceCreated" // ModelAdapterRunning means ModelAdapter has been running on the pod ModelAdapterRunning ModelAdapterPhase = "Running" // ModelAdapterFailed means ModelAdapter has terminated in a failure ModelAdapterFailed ModelAdapterPhase = "Failed" - // ModelAdapterScaling means ModelAdapter is scaling, could be scaling in or out. won't be enabled until we allow multiple replicas + // ModelAdapterUnknown means ModelAdapter clean up some stable resources + ModelAdapterUnknown ModelAdapterPhase = "Unknown" + // ModelAdapterScaled means ModelAdapter is scaled, could be scaling in or out. won't be enabled until we allow multiple replicas // TODO: not implemented yet. - ModelAdapterScaling ModelAdapterPhase = "Scaling" + ModelAdapterScaled ModelAdapterPhase = "Scaled" ) // ModelAdapterStatus defines the observed state of ModelAdapter @@ -97,11 +101,10 @@ type ModelAdapterConditionType string const ( ModelAdapterConditionTypeInitialized ModelAdapterConditionType = "Initialized" - ModelAdapterConditionTypeSelectorMatched ModelAdapterConditionType = "SelectorMatched" ModelAdapterConditionTypeScheduled ModelAdapterConditionType = "Scheduled" + ModelAdapterConditionTypeBound ModelAdapterConditionType = "Bound" ModelAdapterConditionTypeResourceCreated ModelAdapterConditionType = "ResourceCreated" ModelAdapterConditionReady ModelAdapterConditionType = "Ready" - ModelAdapterConditionCleanup ModelAdapterConditionType = "Cleanup" ) // +genclient diff --git a/pkg/controller/modeladapter/modeladapter_controller.go b/pkg/controller/modeladapter/modeladapter_controller.go index e7422454..fe4407ef 100644 --- a/pkg/controller/modeladapter/modeladapter_controller.go +++ b/pkg/controller/modeladapter/modeladapter_controller.go @@ -55,11 +55,33 @@ import ( ) const ( - //ControllerUIDLabelKey = "model-adapter-controller-uid" ModelIdentifierKey = "model.aibrix.ai/name" ModelAdapterFinalizer = "adapter.model.aibrix.ai/finalizer" ModelAdapterPodTemplateLabelKey = "adapter.model.aibrix.ai/enabled" ModelAdapterPodTemplateLabelValue = "true" + + // Reasons for model adapter conditions + // Processing: + + // ModelAdapterInitializedReason is added in model adapter when it comes into the reconciliation loop. + ModelAdapterInitializedReason = "ModelAdapterPending" + // FailedServiceCreateReason is added in a model adapter when it cannot create a new service. + FailedServiceCreateReason = "ServiceCreateError" + // FailedEndpointSliceCreateReason is added in a model adapter when it cannot create a new replica set. + FailedEndpointSliceCreateReason = "EndpointSliceCreateError" + // ModelAdapterLoadingErrorReason is added in a model adapter when it cannot be loaded in an engine pod. + ModelAdapterLoadingErrorReason = "ModelAdapterLoadingError" + // ValidationFailedReason is added when model adapter object fails the validation + ValidationFailedReason = "ValidationFailed" + // StableInstanceFoundReason is added if there's stale pod and instance has been deleted successfully. + StableInstanceFoundReason = "StableInstanceFound" + + // Available: + + // ModelAdapterAvailable is added in a ModelAdapter when it has replicas available. + ModelAdapterAvailable = "ModelAdapterAvailable" + // ModelAdapterUnavailable is added in a ModelAdapter when it doesn't have any pod hosting it. + ModelAdapterUnavailable = "ModelAdapterUnavailable" ) var ( @@ -227,13 +249,13 @@ func (r *ModelAdapterReconciler) Reconcile(ctx context.Context, req ctrl.Request if err != nil { if apierrors.IsNotFound(err) { // Object not found, return. - // For service, endpoint objects, clean up the resources using finalizers/ - klog.InfoS("ModelAdapter resource not found. Ignoring since object mush be deleted", "modelAdapter", req) + // For service, endpoint objects, clean up the resources using finalizers + klog.InfoS("ModelAdapter resource not found. Ignoring since object mush be deleted", "modelAdapter", req.NamespacedName) return reconcile.Result{}, nil } // Error reading the object and let's requeue the request - klog.ErrorS(err, "Failed to get ModelAdapter", "modelAdapter", klog.KObj(modelAdapter)) + klog.ErrorS(err, "Failed to get ModelAdapter", "ModelAdapter", klog.KObj(modelAdapter)) return reconcile.Result{}, err } @@ -241,7 +263,7 @@ func (r *ModelAdapterReconciler) Reconcile(ctx context.Context, req ctrl.Request // the object is not being deleted, so if it does not have the finalizer, // then lets add the finalizer and update the object. if !controllerutil.ContainsFinalizer(modelAdapter, ModelAdapterFinalizer) { - klog.InfoS("Adding finalizer for ModelAdapter") + klog.InfoS("Adding finalizer for ModelAdapter", "ModelAdapter", klog.KObj(modelAdapter)) if ok := controllerutil.AddFinalizer(modelAdapter, ModelAdapterFinalizer); !ok { klog.Error("Failed to add finalizer for ModelAdapter") return ctrl.Result{Requeue: true}, nil @@ -255,8 +277,9 @@ func (r *ModelAdapterReconciler) Reconcile(ctx context.Context, req ctrl.Request // the object is being deleted if controllerutil.ContainsFinalizer(modelAdapter, ModelAdapterFinalizer) { // the finalizer is present, so let's unload lora from those inference engines + // note: the base model pod could be deleted as well, so here we do best effort offloading + // we do not need to reconcile the object if it encounters the unloading error. if err := r.unloadModelAdapter(modelAdapter); err != nil { - // if fail to delete unload lora here, return the error so it can be retried. return ctrl.Result{}, err } if ok := controllerutil.RemoveFinalizer(modelAdapter, ModelAdapterFinalizer); !ok { @@ -279,40 +302,24 @@ func (r *ModelAdapterReconciler) DoReconcile(ctx context.Context, req ctrl.Reque // Let's set the initial status when no status is available if instance.Status.Conditions == nil || len(instance.Status.Conditions) == 0 { instance.Status.Phase = modelv1alpha1.ModelAdapterPending - meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{ - Type: string(modelv1alpha1.ModelAdapterConditionTypeInitialized), - Status: metav1.ConditionUnknown, - Reason: "Reconciling", - Message: "Starting reconciliation", - LastTransitionTime: metav1.Now()}) - - if err := r.Status().Update(ctx, instance); err != nil { - klog.ErrorS(err, "Failed to update ModelAdapter status", "modelAdapter", klog.KObj(instance)) + condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionTypeInitialized), metav1.ConditionUnknown, + ModelAdapterInitializedReason, "Starting reconciliation") + if err := r.updateStatus(ctx, instance, condition); err != nil { return reconcile.Result{}, err - } - - // re-fetch the custom resource after updating the status to avoid 409 error here. - if err := r.Get(ctx, req.NamespacedName, instance); err != nil { - klog.Error(err, "Failed to re-fetch modelAdapter") - return ctrl.Result{}, err + } else { + return reconcile.Result{Requeue: true}, nil } } oldInstance := instance.DeepCopy() - // Step 0: Validate ModelAdapter configurations if err := validateModelAdapter(instance); err != nil { klog.Error(err, "Failed to validate the ModelAdapter") - instance.Status.Phase = modelv1alpha1.ModelAdapterFailed - meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{ - Type: string(modelv1alpha1.ModelAdapterConditionTypeResourceCreated), - Status: metav1.ConditionFalse, - Reason: "ValidationFailed", - Message: "ModelAdapter resource is not valid", - LastTransitionTime: metav1.Now()}) - - if updateErr := r.Status().Update(ctx, instance); updateErr != nil { + condition := NewCondition(string(modelv1alpha1.ModelAdapterPending), metav1.ConditionFalse, + ValidationFailedReason, "ModelAdapter resource is not valid") + // TODO: no need to update the status if the status remain the same + if updateErr := r.updateStatus(ctx, instance, condition); updateErr != nil { klog.ErrorS(err, "Failed to update ModelAdapter status", "modelAdapter", klog.KObj(instance)) return reconcile.Result{}, updateErr } @@ -330,11 +337,10 @@ func (r *ModelAdapterReconciler) DoReconcile(ctx context.Context, req ctrl.Reque // TODO: this needs to be changed once we support multiple lora adapters selectedPodName := instance.Status.Instances[0] if err := r.Get(ctx, types.NamespacedName{Namespace: instance.Namespace, Name: selectedPodName}, selectedPod); err != nil && apierrors.IsNotFound(err) { - klog.ErrorS(err, "Failed to get selected pod but pod is still in ModelAdapter instance list", "modelAdapter", klog.KObj(instance)) - + klog.ErrorS(err, "Selected pod has been deleted and it should be removed from model adapter instance list", "modelAdapter", klog.KObj(instance)) // instance.Status.Instances has been outdated, and we need to clear the pod list // after the pod list is cleaned up, let's reconcile the instance object again in the next loop - return ctrl.Result{}, r.clearModelAdapterInstanceList(ctx, instance) + return ctrl.Result{}, r.clearModelAdapterInstanceList(ctx, instance, selectedPodName) } else if err != nil { // failed to fetch the pod, let's requeue return ctrl.Result{RequeueAfter: defaultRequeueDuration}, err @@ -345,18 +351,15 @@ func (r *ModelAdapterReconciler) DoReconcile(ctx context.Context, req ctrl.Reque // TODO: this should barely happen, let's move this logic to earlier validation logics. return ctrl.Result{}, fmt.Errorf("failed to convert pod selector: %v", err) } - if !selector.Matches(labels.Set(selectedPod.Labels)) { klog.Warning("current assigned pod selector doesn't match model adapter selector") - return ctrl.Result{}, r.clearModelAdapterInstanceList(ctx, instance) + return ctrl.Result{}, r.clearModelAdapterInstanceList(ctx, instance, selectedPodName) } + // base model pod could be unhealthy or in termination, let's clean up the instances. if !utils.IsPodReady(selectedPod) || utils.IsPodTerminating(selectedPod) { - klog.Warning(fmt.Sprintf("current assigned pod %s/%s is not ready, let's clean it up and reschedule the adapter", selectedPod.Namespace, selectedPod.Name)) - // continue to requeue the object to remove endpoint etc in current loop. - if err = r.clearModelAdapterInstanceList(ctx, instance); err != nil { - return ctrl.Result{}, err - } + klog.Warningf("current assigned pod %s/%s is not ready, remove it and reschedule the adapter", selectedPod.Namespace, selectedPod.Name) + return ctrl.Result{}, r.clearModelAdapterInstanceList(ctx, instance, selectedPodName) } existPods = true @@ -366,39 +369,41 @@ func (r *ModelAdapterReconciler) DoReconcile(ctx context.Context, req ctrl.Reque if !existPods { // TODO: as we plan to support lora replicas, it needs some corresponding changes. // it should return a list of pods in future, otherwise, it should be invoked by N times. - selectedPod, err = r.schedulePod(ctx, instance) + activePods, err := r.getActivePodsForModelAdapter(ctx, instance) if err != nil { - klog.ErrorS(err, "Failed to schedule Pod for ModelAdapter", "modelAdapter", instance.Name) return ctrl.Result{}, err } - if selectedPod != nil { - instance.Status.Phase = modelv1alpha1.ModelAdapterScheduling - instance.Status.Instances = []string{selectedPod.Name} - meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{ - Type: string(modelv1alpha1.ModelAdapterConditionTypeSelectorMatched), - Status: metav1.ConditionTrue, - Reason: "Reconciling", - Message: fmt.Sprintf("ModelAdapter %s has been allocated to pod %s", klog.KObj(instance), selectedPod.Name), - LastTransitionTime: metav1.Now(), - }) + if len(activePods) != 0 { + selectedPod, err = r.schedulePod(ctx, instance, activePods) + if err != nil { + klog.ErrorS(err, "Failed to schedule Pod for ModelAdapter", "modelAdapter", klog.KObj(instance)) + return ctrl.Result{}, err + } - if err := r.Status().Update(ctx, instance); err != nil { - klog.InfoS("Got error when updating status", "cluster name", req.Name, "error", err, "ModelAdapter", instance) + instance.Status.Phase = modelv1alpha1.ModelAdapterScheduled + instance.Status.Instances = append(instance.Status.Instances, selectedPod.Name) + condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionTypeScheduled), metav1.ConditionTrue, + "Scheduled", fmt.Sprintf("ModelAdapter %s has been allocated to pod %s/%s", klog.KObj(instance), selectedPod.GetNamespace(), selectedPod.GetName())) + if err := r.updateStatus(ctx, instance, condition); err != nil { + klog.InfoS("Got error when updating status", "error", err, "ModelAdapter", instance) return ctrl.Result{}, err } return ctrl.Result{Requeue: true}, nil + } else { + klog.Warningf("no active pods found for model adapter %v", klog.KObj(instance)) } - // selectedPod is nil means there's no valid pods, it should wait for new pods coming pod or any pod related changes like label change. } // Step 2: Reconcile Loading - if err := r.reconcileLoading(ctx, instance, selectedPod); err != nil { + if err := r.reconcileLoading(ctx, instance); err != nil { // retry any of the failure. - instance.Status.Phase = modelv1alpha1.ModelAdapterFailed - if err := r.Status().Update(ctx, instance); err != nil { + instance.Status.Phase = modelv1alpha1.ModelAdapterBound + condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionTypeBound), metav1.ConditionFalse, + ModelAdapterLoadingErrorReason, fmt.Sprintf("ModelAdapter %s is loaded", klog.KObj(instance))) + if err := r.updateStatus(ctx, instance, condition); err != nil { klog.InfoS("Got error when updating status", "cluster name", req.Name, "error", err, "ModelAdapter", instance) - return ctrl.Result{RequeueAfter: defaultRequeueDuration}, err + return ctrl.Result{}, err } return ctrl.Result{RequeueAfter: defaultRequeueDuration}, err @@ -406,24 +411,33 @@ func (r *ModelAdapterReconciler) DoReconcile(ctx context.Context, req ctrl.Reque // Step 3: Reconcile Service if ctrlResult, err := r.reconcileService(ctx, instance); err != nil { - if updateErr := r.updateModelAdapterState(ctx, instance, modelv1alpha1.ModelAdapterFailed); updateErr != nil { - klog.ErrorS(updateErr, "ModelAdapter update state error", "cluster name", req.Name) + instance.Status.Phase = modelv1alpha1.ModelAdapterResourceCreated + condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionTypeResourceCreated), metav1.ConditionFalse, + FailedServiceCreateReason, "service creation failure") + if err := r.updateStatus(ctx, instance, condition); err != nil { + klog.InfoS("Got error when updating status", req.Name, "error", err, "ModelAdapter", instance) + return ctrl.Result{}, err } return ctrlResult, err } // Step 4: Reconcile EndpointSlice - if ctrlResult, err := r.reconcileEndpointSlice(ctx, instance, selectedPod); err != nil { - if updateErr := r.updateModelAdapterState(ctx, instance, modelv1alpha1.ModelAdapterFailed); updateErr != nil { - klog.ErrorS(updateErr, "ModelAdapter update state error", "cluster name", req.Name) + if ctrlResult, err := r.reconcileEndpointSlice(ctx, instance); err != nil { + instance.Status.Phase = modelv1alpha1.ModelAdapterResourceCreated + condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionTypeResourceCreated), metav1.ConditionFalse, + FailedEndpointSliceCreateReason, "endpointslice creation failure") + if err := r.updateStatus(ctx, instance, condition); err != nil { + klog.InfoS("Got error when updating status", "error", err, "ModelAdapter", instance) + return ctrl.Result{}, err } return ctrlResult, err } - // Check if need to update the status. + // Check if we need to update the status. if r.inconsistentModelAdapterStatus(oldInstance.Status, instance.Status) { - klog.InfoS("model adapter reconcile", "Update CR status", req.Name, "status", instance.Status) - if err = r.updateStatus(ctx, instance); err != nil { + condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionReady), metav1.ConditionTrue, + ModelAdapterAvailable, fmt.Sprintf("ModelAdapter %s is ready", klog.KObj(instance))) + if err = r.updateStatus(ctx, instance, condition); err != nil { return reconcile.Result{}, fmt.Errorf("update modelAdapter status error: %v", err) } } @@ -431,90 +445,88 @@ func (r *ModelAdapterReconciler) DoReconcile(ctx context.Context, req ctrl.Reque return ctrl.Result{}, nil } -func (r *ModelAdapterReconciler) updateStatus(ctx context.Context, instance *modelv1alpha1.ModelAdapter) error { - meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{ - Type: string(modelv1alpha1.ModelAdapterConditionReady), - Status: metav1.ConditionTrue, - Reason: "Reconciling", - Message: fmt.Sprintf("ModelAdapter %s is ready", klog.KObj(instance)), - LastTransitionTime: metav1.Now(), - }) - +func (r *ModelAdapterReconciler) updateStatus(ctx context.Context, instance *modelv1alpha1.ModelAdapter, condition metav1.Condition) error { + klog.InfoS("model adapter reconcile", "Update CR status", instance.Name, "status", instance.Status) + meta.SetStatusCondition(&instance.Status.Conditions, condition) return r.Status().Update(ctx, instance) } -func (r *ModelAdapterReconciler) clearModelAdapterInstanceList(ctx context.Context, instance *modelv1alpha1.ModelAdapter) error { - stalePodName := instance.Status.Instances[0] - instance.Status.Instances = []string{} - meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{ - Type: string(modelv1alpha1.ModelAdapterConditionCleanup), - Status: metav1.ConditionTrue, - Reason: "Reconciling", - Message: fmt.Sprintf("Pod (%s) can not be fetched or invalid for model adapter (%s), clean up the list", stalePodName, instance.Name), - LastTransitionTime: metav1.Now(), - }) - +func (r *ModelAdapterReconciler) clearModelAdapterInstanceList(ctx context.Context, instance *modelv1alpha1.ModelAdapter, stalePodName string) error { + instance.Status.Instances = RemoveInstanceFromList(instance.Status.Instances, stalePodName) // remove instance means the lora has not targets at this moment. instance.Status.Phase = modelv1alpha1.ModelAdapterPending + condition := NewCondition(string(modelv1alpha1.ModelAdapterFailed), metav1.ConditionTrue, + StableInstanceFoundReason, + fmt.Sprintf("Pod (%s/%s) is stale or invalid for model adapter (%s/%s), clean up the list", instance.GetNamespace(), stalePodName, instance.GetNamespace(), instance.Name)) - if err := r.Status().Update(ctx, instance); err != nil { - klog.Error(err, "Failed to update modelAdapter status") + if err := r.updateStatus(ctx, instance, condition); err != nil { return err } return nil } -func (r *ModelAdapterReconciler) schedulePod(ctx context.Context, instance *modelv1alpha1.ModelAdapter) (*corev1.Pod, error) { - // Implement your scheduling logic here to select a Pod based on the instance.Spec.PodSelector - // For the sake of example, we will just list the Pods matching the selector and pick the first one +// getActivePodsForModelAdapter retrieves all pods matching the selector and filters them to only include active ones +func (r *ModelAdapterReconciler) getActivePodsForModelAdapter(ctx context.Context, instance *modelv1alpha1.ModelAdapter) ([]corev1.Pod, error) { podList := &corev1.PodList{} listOpts := []client.ListOption{ - client.InNamespace(instance.Namespace), + client.InNamespace(instance.GetNamespace()), client.MatchingLabels(instance.Spec.PodSelector.MatchLabels), } + + // List all pods matching the label selector if err := r.List(ctx, podList, listOpts...); err != nil { return nil, err } - // filter active pod + // Filter out terminating or not ready pods var activePods []corev1.Pod - for _, pod := range podList.Items { if !utils.IsPodTerminating(&pod) && utils.IsPodReady(&pod) { activePods = append(activePods, pod) } } - if len(activePods) == 0 { - klog.Warning("no pods found matching selector") - return nil, nil - } + return activePods, nil +} +// schedulePod picks a valid pod to schedule the model adapter +func (r *ModelAdapterReconciler) schedulePod(ctx context.Context, instance *modelv1alpha1.ModelAdapter, activePods []corev1.Pod) (*corev1.Pod, error) { + // Implement your scheduling logic here to select a Pod based on the instance.Spec.PodSelector + // For the sake of example, we will just list the Pods matching the selector and pick the first one return r.scheduler.SelectPod(ctx, activePods) } -func (r *ModelAdapterReconciler) reconcileLoading(ctx context.Context, instance *modelv1alpha1.ModelAdapter, pod *corev1.Pod) error { - if pod == nil { +func (r *ModelAdapterReconciler) reconcileLoading(ctx context.Context, instance *modelv1alpha1.ModelAdapter) error { + if len(instance.Status.Instances) == 0 { return nil } + targetPod := &corev1.Pod{} + podName := instance.Status.Instances[0] + err := r.Get(ctx, types.NamespacedName{Namespace: instance.Namespace, Name: podName}, targetPod) + if err != nil && apierrors.IsNotFound(err) { + return fmt.Errorf("pod %s/%s can not be found, skip loading", instance.GetName(), podName) + } else if err != nil { + return err + } + // selectPod could be in termination, in this case, we just do nothing. - if pod.DeletionTimestamp != nil { + if targetPod.DeletionTimestamp != nil { return nil } // Define the key you want to check key := "DEBUG_MODE" value, exists := getEnvKey(key) - host := fmt.Sprintf("http://%s:8000", pod.Status.PodIP) + host := fmt.Sprintf("http://%s:8000", targetPod.Status.PodIP) if exists && value == "on" { // 30080 is the nodePort of the base model service. host = fmt.Sprintf("http://%s:30081", "localhost") } // Check if the model is already loaded - exists, err := r.modelAdapterExists(host, instance.Name) + exists, err = r.modelAdapterExists(host, instance.Name) if err != nil { return err } @@ -529,20 +541,6 @@ func (r *ModelAdapterReconciler) reconcileLoading(ctx context.Context, instance return err } - // Update the instance status - instance.Status.Phase = modelv1alpha1.ModelAdapterBinding - meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{ - Type: string(modelv1alpha1.ModelAdapterConditionTypeScheduled), - Status: metav1.ConditionTrue, - Reason: "Reconciling", - Message: fmt.Sprintf("ModelAdapter %s is loaded", klog.KObj(instance)), - LastTransitionTime: metav1.Now(), - }) - if err := r.Status().Update(ctx, instance); err != nil { - klog.InfoS("Got error when updating status", "error", err, "ModelAdapter", instance) - return err - } - return nil } @@ -633,9 +631,10 @@ func (r *ModelAdapterReconciler) loadModelAdapter(host string, instance *modelv1 } // unloadModelAdapter unloads the loras from inference engines +// base model pod could be deleted, in this case, we just do optimistic unloading. It only returns some necessary errors and http errors should not be returned. func (r *ModelAdapterReconciler) unloadModelAdapter(instance *modelv1alpha1.ModelAdapter) error { if len(instance.Status.Instances) == 0 { - klog.Warning("model adapter has not been deployed to any pods yet, skip unloading") + klog.Warningf("model adapter %s/%s has not been deployed to any pods yet, skip unloading", instance.GetNamespace(), instance.GetName()) return nil } @@ -648,7 +647,7 @@ func (r *ModelAdapterReconciler) unloadModelAdapter(instance *modelv1alpha1.Mode Name: podName, }, targetPod); err != nil { if apierrors.IsNotFound(err) { - // since the pod doesn't exist, unload is unnecessary + klog.Warningf("Failed to find lora Pod instance %s/%s from apiserver, skip unloading", instance.GetNamespace(), podName) return nil } klog.Warning("Error getting Pod from lora instance list", err) @@ -668,7 +667,7 @@ func (r *ModelAdapterReconciler) unloadModelAdapter(instance *modelv1alpha1.Mode value, exists := getEnvKey(key) if exists && value == "on" { // 30080 is the nodePort of the base model service. - url = "http://localhost:30080/v1/unload_lora_adapter" + url = "http://localhost:30081/v1/unload_lora_adapter" } req, err := http.NewRequest("POST", url, bytes.NewBuffer(payloadBytes)) @@ -677,10 +676,10 @@ func (r *ModelAdapterReconciler) unloadModelAdapter(instance *modelv1alpha1.Mode } req.Header.Set("Content-Type", "application/json") - client := &http.Client{} - resp, err := client.Do(req) + httpClient := &http.Client{} + resp, err := httpClient.Do(req) if err != nil { - return err + return nil } defer func() { if err := resp.Body.Close(); err != nil { @@ -690,47 +689,19 @@ func (r *ModelAdapterReconciler) unloadModelAdapter(instance *modelv1alpha1.Mode if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("failed to unload LoRA adapter: %s", body) + klog.Warningf("failed to unload LoRA adapter: %s", body) } return nil } -func (r *ModelAdapterReconciler) updateModelAdapterState(ctx context.Context, instance *modelv1alpha1.ModelAdapter, phase modelv1alpha1.ModelAdapterPhase) error { - if instance.Status.Phase == phase { - return nil - } - instance.Status.Phase = phase - klog.InfoS("Update CR Status.Phase", "phase", phase) - return r.Status().Update(ctx, instance) -} - func (r *ModelAdapterReconciler) reconcileService(ctx context.Context, instance *modelv1alpha1.ModelAdapter) (ctrl.Result, error) { // Retrieve the Service from the Kubernetes cluster with the name and namespace. found := &corev1.Service{} - err := r.Get(ctx, types.NamespacedName{Namespace: instance.Namespace, Name: instance.Name}, found) if err != nil && apierrors.IsNotFound(err) { // Service does not exist, create a new one - svc, err := buildModelAdapterService(instance) - if err != nil { - klog.ErrorS(err, "Failed to define new Service resource for ModelAdapter") - meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{ - Type: string(modelv1alpha1.ModelAdapterConditionTypeResourceCreated), - Status: metav1.ConditionFalse, - Reason: "Reconciling", - Message: fmt.Sprintf("Failed to create Service for the custom resource (%s): (%s)", instance.Name, err), - LastTransitionTime: metav1.Now(), - }) - - if err := r.Status().Update(ctx, instance); err != nil { - klog.Error(err, "Failed to update modelAdapter status") - return ctrl.Result{}, err - } - - return ctrl.Result{}, err - } - + svc := buildModelAdapterService(instance) // Set the owner reference if err := ctrl.SetControllerReference(instance, svc, r.Scheme); err != nil { klog.Error(err, "Failed to set controller reference to modelAdapter") @@ -738,9 +709,14 @@ func (r *ModelAdapterReconciler) reconcileService(ctx context.Context, instance } // create service - klog.InfoS("Creating a new service", "service.namespace", svc.Namespace, "service.name", svc.Name) + klog.InfoS("Creating a new service", "service", klog.KObj(svc)) if err = r.Create(ctx, svc); err != nil { - klog.ErrorS(err, "Failed to create new service resource", "service.namespace", svc.Namespace, "service.name", svc.Name) + klog.ErrorS(err, "Failed to create new service resource for ModelAdapter", "service", klog.KObj(svc)) + condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionTypeResourceCreated), metav1.ConditionFalse, + FailedServiceCreateReason, fmt.Sprintf("Failed to create Service for the modeladapter (%s): (%s)", klog.KObj(instance), err)) + if err := r.updateStatus(ctx, instance, condition); err != nil { + return ctrl.Result{}, err + } return ctrl.Result{}, err } } else if err != nil { @@ -754,66 +730,90 @@ func (r *ModelAdapterReconciler) reconcileService(ctx context.Context, instance return ctrl.Result{}, nil } -func (r *ModelAdapterReconciler) reconcileEndpointSlice(ctx context.Context, instance *modelv1alpha1.ModelAdapter, pod *corev1.Pod) (ctrl.Result, error) { +func (r *ModelAdapterReconciler) reconcileEndpointSlice(ctx context.Context, instance *modelv1alpha1.ModelAdapter) (ctrl.Result, error) { // check if the endpoint slice already exists, if not create a new one. found := &discoveryv1.EndpointSlice{} - err := r.Get(ctx, types.NamespacedName{Namespace: instance.Namespace, Name: instance.Name}, found) - if err != nil && apierrors.IsNotFound(err) { - // EndpointSlice does not exist, create it - eps, err := buildModelAdapterEndpointSlice(instance, pod) - if err != nil { - klog.ErrorS(err, "Failed to define new EndpointSlice resource for ModelAdapter") - instance.Status.Phase = modelv1alpha1.ModelAdapterFailed - meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{ - Type: string(modelv1alpha1.ModelAdapterConditionTypeResourceCreated), - Status: metav1.ConditionFalse, - Reason: "Reconciling", - Message: fmt.Sprintf("Failed to create EndpointSlice for the custom resource (%s): (%s)", instance.Name, err), - LastTransitionTime: metav1.Now(), - }) + // instance could be clean up in earlier reconciliation, we need to clean up the instance from endpoint list. + if len(instance.Status.Instances) == 0 { + klog.Warningf("model adapter %s has not been deployed to any pods yet or being deleted, skip creating endpointslice", klog.KObj(instance)) - if err := r.Status().Update(ctx, instance); err != nil { - klog.Error(err, "Failed to update modelAdapter status") - return ctrl.Result{}, err + // reset endpoint slice + if err := r.Get(ctx, types.NamespacedName{Namespace: instance.Namespace, Name: instance.Name}, found); err != nil { + if apierrors.IsNotFound(err) { + klog.Warningf("Failed to fetch the endpoint slice %s", klog.KObj(instance)) } - return ctrl.Result{}, err } - // Set the owner reference - if err := ctrl.SetControllerReference(instance, eps, r.Scheme); err != nil { - klog.Error(err, "Failed to set controller reference to modelAdapter") + found.Endpoints = []discoveryv1.Endpoint{} + if err := r.Update(ctx, found); err != nil { + klog.ErrorS(err, "Failed to update EndpointSlice after clearing endpoints", "EndpointSlice", found.Name) return ctrl.Result{}, err } - // create service - klog.InfoS("Creating a new EndpointSlice", "endpointslice.namespace", eps.Namespace, "endpointslice.name", eps.Name) - if err = r.Create(ctx, eps); err != nil { - klog.ErrorS(err, "Failed to create new EndpointSlice resource", "endpointslice.namespace", eps.Namespace, "endpointslice.name", eps.Name) + return ctrl.Result{}, nil + } + + // TODO: do necessary refactor to support multiple lora instance + podName := instance.Status.Instances[0] + pod := &corev1.Pod{} + if err := r.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: podName}, pod); err != nil { + if !apierrors.IsNotFound(err) { + klog.Warning("Error getting Pod from lora instance list", err) return ctrl.Result{}, err } - } else if err != nil { - klog.ErrorS(err, "Failed to get EndpointSlice") - return ctrl.Result{}, err - } else { - // Check if pod is nil, and if so, clear the endpoints and set the phase to Pending - if pod == nil { - klog.InfoS("Pod is nil, clearing all endpoints and setting status to Pending") - found.Endpoints = []discoveryv1.Endpoint{} - if err := r.Update(ctx, found); err != nil { - klog.ErrorS(err, "Failed to update EndpointSlice after clearing endpoints", "EndpointSlice", found.Name) - return ctrl.Result{}, err + klog.Warningf("pod %s/%s has been deleted, let's clean up the endpoint slice", instance.GetNamespace(), podName) + // TODO: do necessary refactor to support multiple lora instance + // the tricky thing is we do not know the pod map to pod ip mapping. instance only save pods, endpointslice only save ips + if err := r.Get(ctx, types.NamespacedName{Namespace: instance.Namespace, Name: instance.Name}, found); err != nil { + if apierrors.IsNotFound(err) { + // this should barely happen, in this case, there's no need to move forward + klog.Warningf("Endpoint slice %s doesn't exist", klog.KObj(instance)) + return ctrl.Result{}, nil } + return ctrl.Result{}, err + } - instance.Status.Phase = modelv1alpha1.ModelAdapterPending - if err := r.Status().Update(ctx, instance); err != nil { - klog.Error(err, "Failed to update modelAdapter status to Pending") - return ctrl.Result{}, err - } + // reset endpoint slice + found.Endpoints = []discoveryv1.Endpoint{} + if err := r.Update(ctx, found); err != nil { + klog.ErrorS(err, "Failed to update EndpointSlice after clearing endpoints", "EndpointSlice", found.Name) + return ctrl.Result{}, err + } + + return ctrl.Result{}, nil + } + + // pod object fetched, let's check existence of endpoint slice + err := r.Get(ctx, types.NamespacedName{Namespace: instance.Namespace, Name: instance.Name}, found) + if err != nil { + if !apierrors.IsNotFound(err) { + klog.ErrorS(err, "Failed to get EndpointSlice") + return ctrl.Result{}, err + } - return ctrl.Result{}, nil + // EndpointSlice does not exist, create it + eps := buildModelAdapterEndpointSlice(instance, pod) + // Set the owner reference + if err := ctrl.SetControllerReference(instance, eps, r.Scheme); err != nil { + klog.Error(err, "Failed to set controller reference to modelAdapter") + return ctrl.Result{}, err } + // create endpoint slice + klog.InfoS("Creating a new EndpointSlice", "endpointslice", klog.KObj(eps)) + if err = r.Create(ctx, eps); err != nil { + klog.ErrorS(err, "Failed to create new EndpointSlice resource for ModelAdapter", "endpointslice", klog.KObj(eps)) + instance.Status.Phase = modelv1alpha1.ModelAdapterFailed + condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionTypeResourceCreated), metav1.ConditionFalse, + FailedEndpointSliceCreateReason, fmt.Sprintf("Failed to create EndpointSlice for the custom resource (%s): (%s)", instance.Name, err)) + if err := r.updateStatus(ctx, instance, condition); err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{}, err + } + instance.Status.Phase = modelv1alpha1.ModelAdapterRunning + } else { // Existing EndpointSlice Found. Check if the Pod IP is already in the EndpointSlice podIP := pod.Status.PodIP alreadyExists := false @@ -829,7 +829,7 @@ func (r *ModelAdapterReconciler) reconcileEndpointSlice(ctx context.Context, ins } } - // Append the Pod IP to the EndpointSlice if it doesn't already exist + // Append the Pod IP to the EndpointSlice if it doesn't exist if !alreadyExists { found.Endpoints = append(found.Endpoints, discoveryv1.Endpoint{ Addresses: []string{podIP}, @@ -871,10 +871,10 @@ func (r *ModelAdapterReconciler) reconcileEndpointSlice(ctx context.Context, ins return ctrl.Result{}, err } - instance.Status.Phase = modelv1alpha1.ModelAdapterPending + instance.Status.Phase = modelv1alpha1.ModelAdapterFailed klog.InfoS("Successfully removed Pod IP from EndpointSlice", "PodIP", podIP, "EndpointSlice", found.Name) } else { - klog.InfoS("Pod IP already exists in EndpointSlice", "PodName", pod.Name, "PodIP", podIP) + klog.V(4).InfoS("Pod IP already exists in EndpointSlice", "PodName", pod.Name, "PodIP", podIP) instance.Status.Phase = modelv1alpha1.ModelAdapterRunning } } diff --git a/pkg/controller/modeladapter/resources.go b/pkg/controller/modeladapter/resources.go index 23c3186c..2c4749ec 100644 --- a/pkg/controller/modeladapter/resources.go +++ b/pkg/controller/modeladapter/resources.go @@ -22,9 +22,10 @@ import ( discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/utils/ptr" ) -func buildModelAdapterEndpointSlice(instance *modelv1alpha1.ModelAdapter, pod *corev1.Pod) (*discoveryv1.EndpointSlice, error) { +func buildModelAdapterEndpointSlice(instance *modelv1alpha1.ModelAdapter, pod *corev1.Pod) *discoveryv1.EndpointSlice { serviceLabels := map[string]string{ "kubernetes.io/service-name": instance.Name, } @@ -37,9 +38,9 @@ func buildModelAdapterEndpointSlice(instance *modelv1alpha1.ModelAdapter, pod *c ports := []discoveryv1.EndpointPort{ { - Name: stringPtr("http"), - Protocol: protocolPtr(corev1.ProtocolTCP), - Port: int32Ptr(8000), + Name: ptr.To("http"), + Protocol: ptr.To(corev1.ProtocolTCP), + Port: ptr.To(int32(8000)), }, } @@ -56,10 +57,10 @@ func buildModelAdapterEndpointSlice(instance *modelv1alpha1.ModelAdapter, pod *c AddressType: discoveryv1.AddressTypeIPv4, Endpoints: addresses, Ports: ports, - }, nil + } } -func buildModelAdapterService(instance *modelv1alpha1.ModelAdapter) (*corev1.Service, error) { +func buildModelAdapterService(instance *modelv1alpha1.ModelAdapter) *corev1.Service { labels := map[string]string{ "model.aibrix.ai/name": instance.Spec.BaseModel, "adapter.model.aibrix.ai/name": instance.Name, @@ -94,5 +95,5 @@ func buildModelAdapterService(instance *modelv1alpha1.ModelAdapter) (*corev1.Ser PublishNotReadyAddresses: true, Ports: ports, }, - }, nil + } } diff --git a/pkg/controller/modeladapter/resources_test.go b/pkg/controller/modeladapter/resources_test.go index 31a5e921..f0b26fad 100644 --- a/pkg/controller/modeladapter/resources_test.go +++ b/pkg/controller/modeladapter/resources_test.go @@ -43,10 +43,7 @@ func TestBuildModelAdapterEndpointSlice(t *testing.T) { } // Call the function to test - endpointSlice, err := buildModelAdapterEndpointSlice(instance, pod) - - // Assert no errors - assert.NoError(t, err) + endpointSlice := buildModelAdapterEndpointSlice(instance, pod) // Check EndpointSlice metadata assert.Equal(t, "test-instance", endpointSlice.Name) @@ -83,10 +80,7 @@ func TestBuildModelAdapterService(t *testing.T) { } // Call the function to test - service, err := buildModelAdapterService(instance) - - // Assert no errors - assert.NoError(t, err) + service := buildModelAdapterService(instance) // Check Service metadata assert.Equal(t, "test-instance", service.Name) diff --git a/pkg/controller/modeladapter/scheduling/leastadapters.go b/pkg/controller/modeladapter/scheduling/leastadapters.go index 0f855a82..e2dda463 100644 --- a/pkg/controller/modeladapter/scheduling/leastadapters.go +++ b/pkg/controller/modeladapter/scheduling/leastadapters.go @@ -50,6 +50,6 @@ func (r leastAdapters) SelectPod(ctx context.Context, pods []v1.Pod) (*v1.Pod, e } } - klog.Infof("pod selected with least model adapters: %s", selectedPod.Name) + klog.InfoS("pod selected with least model adapters", "pod", klog.KObj(&selectedPod)) return &selectedPod, nil } diff --git a/pkg/controller/modeladapter/utils.go b/pkg/controller/modeladapter/utils.go index 350f317d..41fd195a 100644 --- a/pkg/controller/modeladapter/utils.go +++ b/pkg/controller/modeladapter/utils.go @@ -23,27 +23,10 @@ import ( "os" "strings" - corev1 "k8s.io/api/core/v1" - modelv1alpha1 "github.com/aibrix/aibrix/api/model/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func stringPtr(s string) *string { - return &s -} - -func protocolPtr(p corev1.Protocol) *corev1.Protocol { - return &p -} - -func int32Ptr(i int32) *int32 { - return &i -} - -func mapPtr(m map[string]string) *map[string]string { - return &m -} - func validateModelAdapter(instance *modelv1alpha1.ModelAdapter) error { if instance.Spec.ArtifactURL == "" { return fmt.Errorf("artifactURL is required") @@ -128,7 +111,7 @@ func extractHuggingFacePath(artifactURL string) (string, error) { return path, nil } -func stringInSlice(slice []string, str string) bool { +func StringInSlice(slice []string, str string) bool { for _, v := range slice { if v == str { return true @@ -136,3 +119,25 @@ func stringInSlice(slice []string, str string) bool { } return false } + +// RemoveInstanceFromList removes a string from a slice of strings +func RemoveInstanceFromList(slice []string, strToRemove string) []string { + var result []string + for _, s := range slice { + if s != strToRemove { + result = append(result, s) + } + } + return result +} + +// NewCondition creates a new condition. +func NewCondition(condType string, status metav1.ConditionStatus, reason, msg string) metav1.Condition { + return metav1.Condition{ + Type: condType, + Status: status, + LastTransitionTime: metav1.Now(), + Reason: reason, + Message: msg, + } +} diff --git a/pkg/controller/modeladapter/utils_test.go b/pkg/controller/modeladapter/utils_test.go index 80033ef2..136f31c4 100644 --- a/pkg/controller/modeladapter/utils_test.go +++ b/pkg/controller/modeladapter/utils_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" modelv1alpha1 "github.com/aibrix/aibrix/api/model/v1alpha1" ) @@ -36,7 +37,7 @@ func TestValidateModelAdapter(t *testing.T) { PodSelector: &metav1.LabelSelector{ MatchLabels: map[string]string{"app": "test"}, }, - Replicas: int32Ptr(1), + Replicas: ptr.To(int32(1)), }, }