Skip to content

Commit

Permalink
feat: update of functions related to nodeclaim and machine for RAG en…
Browse files Browse the repository at this point in the history
…gine (#620)

**Reason for Change**:
update of functions related to nodeclaim and machine for RAG engine

**Requirements**

- [ ] added unit tests and e2e tests (if applicable).

**Issue Fixed**:
<!-- If this PR fixes GitHub issue 4321, add "Fixes #4321" to the next
line. -->

**Notes for Reviewers**:

---------

Signed-off-by: Bangqi Zhu <bangqizhu@microsoft.com>
Co-authored-by: Bangqi Zhu <bangqizhu@microsoft.com>
  • Loading branch information
bangqipropel and Bangqi Zhu authored Oct 9, 2024
1 parent 38656dd commit f613bb4
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 50 deletions.
6 changes: 6 additions & 0 deletions api/v1alpha1/workspace_labels.go → api/v1alpha1/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@ const (
// LabelWorkspaceName is the label for workspace name.
LabelWorkspaceName = KAITOPrefix + "workspace"

// LabelRAGEngineName is the label for ragengine name.
LabelRAGEngineName = KAITOPrefix + "ragengine"

// LabelWorkspaceName is the label for workspace namespace.
LabelWorkspaceNamespace = KAITOPrefix + "workspacenamespace"

// LabelRAGEngineNamespace is the label for ragengine namespace.
LabelRAGEngineNamespace = KAITOPrefix + "ragenginenamespace"

// WorkspaceRevisionAnnotation is the Annotations for revision number
WorkspaceRevisionAnnotation = "workspace.kaito.io/revision"
)
4 changes: 2 additions & 2 deletions pkg/controllers/workspace_gc_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (c *WorkspaceReconciler) garbageCollectWorkspace(ctx context.Context, wObj
klog.InfoS("garbageCollectWorkspace", "workspace", klog.KObj(wObj))

// Check if there are any machines associated with this workspace.
mList, err := machine.ListMachinesByWorkspace(ctx, wObj, c.Client)
mList, err := machine.ListMachines(ctx, wObj, c.Client)
if err != nil {
return ctrl.Result{}, err
}
Expand All @@ -36,7 +36,7 @@ func (c *WorkspaceReconciler) garbageCollectWorkspace(ctx context.Context, wObj

if featuregates.FeatureGates[consts.FeatureFlagKarpenter] {
// Check if there are any nodeClaims associated with this workspace.
ncList, err := nodeclaim.ListNodeClaimByWorkspace(ctx, wObj, c.Client)
ncList, err := nodeclaim.ListNodeClaim(ctx, wObj, c.Client)
if err != nil {
return ctrl.Result{}, err
}
Expand Down
70 changes: 49 additions & 21 deletions pkg/machine/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
kaitov1alpha1 "github.com/azure/kaito/api/v1alpha1"
"github.com/azure/kaito/pkg/resources"
"github.com/azure/kaito/pkg/utils/consts"
"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
Expand All @@ -37,25 +38,32 @@ var (
machineStatusTimeoutInterval = 240 * time.Second
)

// GenerateMachineManifest generates a machine object from the given workspace.
func GenerateMachineManifest(ctx context.Context, storageRequirement string, workspaceObj *kaitov1alpha1.Workspace) *v1alpha5.Machine {
digest := sha256.Sum256([]byte(workspaceObj.Namespace + workspaceObj.Name + time.Now().Format("2006-01-02 15:04:05.000000000"))) // We make sure the machine name is not fixed to the a workspace
// GenerateMachineManifest generates a machine object from the given workspace or RAGEngine.
func GenerateMachineManifest(ctx context.Context, storageRequirement string, obj interface{}) *v1alpha5.Machine {

// Determine the type of the input object and extract relevant fields
instanceType, namespace, name, labelSelector, err := resources.ExtractObjFields(obj)
if err != nil {
klog.Error(err)
return nil
}

digest := sha256.Sum256([]byte(namespace + name + time.Now().Format("2006-01-02 15:04:05.000000000"))) // We make sure the nodeClaim name is not fixed to the object
machineName := "ws" + hex.EncodeToString(digest[0:])[0:9]
machineLabels := map[string]string{
LabelProvisionerName: ProvisionerName,
kaitov1alpha1.LabelWorkspaceName: workspaceObj.Name,
kaitov1alpha1.LabelWorkspaceNamespace: workspaceObj.Namespace,
kaitov1alpha1.LabelWorkspaceName: name,
kaitov1alpha1.LabelWorkspaceNamespace: namespace,
}
if workspaceObj.Resource.LabelSelector != nil &&
len(workspaceObj.Resource.LabelSelector.MatchLabels) != 0 {
machineLabels = lo.Assign(machineLabels, workspaceObj.Resource.LabelSelector.MatchLabels)

if labelSelector != nil && len(labelSelector.MatchLabels) != 0 {
machineLabels = lo.Assign(machineLabels, labelSelector.MatchLabels)
}

return &v1alpha5.Machine{
ObjectMeta: metav1.ObjectMeta{
Name: machineName,
Namespace: workspaceObj.Namespace,
Namespace: namespace,
Labels: machineLabels,
},
Spec: v1alpha5.MachineSpec{
Expand All @@ -66,7 +74,7 @@ func GenerateMachineManifest(ctx context.Context, storageRequirement string, wor
{
Key: v1.LabelInstanceTypeStable,
Operator: v1.NodeSelectorOpIn,
Values: []string{workspaceObj.Resource.InstanceType},
Values: []string{instanceType},
},
{
Key: LabelProvisionerName,
Expand Down Expand Up @@ -134,26 +142,34 @@ func CreateMachine(ctx context.Context, machineObj *v1alpha5.Machine, kubeClient
}

// WaitForPendingMachines checks if the there are any machines in provisioning condition. If so, wait until they are ready.
func WaitForPendingMachines(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace, kubeClient client.Client) error {
machines, err := ListMachinesByWorkspace(ctx, workspaceObj, kubeClient)
func WaitForPendingMachines(ctx context.Context, obj interface{}, kubeClient client.Client) error {
var instanceType string

// Determine the type of the input object and retrieve the InstanceType
instanceType, _, _, _, err := resources.ExtractObjFields(obj)
if err != nil {
return err
}

machines, err := ListMachines(ctx, obj, kubeClient)
if err != nil {
return err
}

for i := range machines.Items {
// check if the machine is being created has the requested workspace instance type.
// check if the machine is being created and has the requested instance type
_, machineInstanceType := lo.Find(machines.Items[i].Spec.Requirements, func(requirement v1.NodeSelectorRequirement) bool {
return requirement.Key == v1.LabelInstanceTypeStable &&
requirement.Operator == v1.NodeSelectorOpIn &&
lo.Contains(requirement.Values, workspaceObj.Resource.InstanceType)
lo.Contains(requirement.Values, instanceType)
})
if machineInstanceType {
_, found := lo.Find(machines.Items[i].GetConditions(), func(condition apis.Condition) bool {
return condition.Type == v1alpha5.MachineInitialized && condition.Status == v1.ConditionFalse
})

if found || machines.Items[i].GetConditions() == nil { // checking conditions==nil is a workaround for conditions delaying to set on the machine object.
//wait until machine is initialized.
if found || machines.Items[i].GetConditions() == nil { // Check if conditions==nil is a workaround for condition delays in setting the machine object
// wait until the machine is initialized
if err := CheckMachineStatus(ctx, &machines.Items[i], kubeClient); err != nil {
return err
}
Expand All @@ -163,13 +179,25 @@ func WaitForPendingMachines(ctx context.Context, workspaceObj *kaitov1alpha1.Wor
return nil
}

// ListMachinesByWorkspace list all machine objects in the cluster that are created by the workspace identified by the label.
func ListMachinesByWorkspace(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace, kubeClient client.Client) (*v1alpha5.MachineList, error) {
// ListMachines lists all machine objects in the cluster that are created by the given workspace or RAGEngine.
func ListMachines(ctx context.Context, obj interface{}, kubeClient client.Client) (*v1alpha5.MachineList, error) {
machineList := &v1alpha5.MachineList{}

ls := labels.Set{
kaitov1alpha1.LabelWorkspaceName: workspaceObj.Name,
kaitov1alpha1.LabelWorkspaceNamespace: workspaceObj.Namespace,
var ls labels.Set

switch o := obj.(type) {
case *kaitov1alpha1.Workspace:
ls = labels.Set{
kaitov1alpha1.LabelWorkspaceName: o.Name,
kaitov1alpha1.LabelWorkspaceNamespace: o.Namespace,
}
case *kaitov1alpha1.RAGEngine:
ls = labels.Set{
kaitov1alpha1.LabelRAGEngineName: o.Name,
kaitov1alpha1.LabelRAGEngineNamespace: o.Namespace,
}
default:
return nil, fmt.Errorf("unsupported object type: %T", obj)
}

err := retry.OnError(retry.DefaultBackoff, func(err error) bool {
Expand Down
87 changes: 60 additions & 27 deletions pkg/nodeclaim/nodeclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
azurev1alpha2 "github.com/Azure/karpenter-provider-azure/pkg/apis/v1alpha2"
awsv1beta1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1beta1"
kaitov1alpha1 "github.com/azure/kaito/api/v1alpha1"
"github.com/azure/kaito/pkg/resources"
"github.com/azure/kaito/pkg/utils/consts"
"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
Expand All @@ -41,20 +42,26 @@ var (
nodeClaimStatusTimeoutInterval = 240 * time.Second
)

// GenerateNodeClaimManifest generates a nodeClaim object from the given workspace.
func GenerateNodeClaimManifest(ctx context.Context, storageRequirement string, workspaceObj *kaitov1alpha1.Workspace) *v1beta1.NodeClaim {
klog.InfoS("GenerateNodeClaimManifest", "workspace", klog.KObj(workspaceObj))
// GenerateNodeClaimManifest generates a nodeClaim object from the given workspace or RAGEngine.
func GenerateNodeClaimManifest(ctx context.Context, storageRequirement string, obj interface{}) *v1beta1.NodeClaim {
klog.InfoS("GenerateNodeClaimManifest", "object", obj)

nodeClaimName := GenerateNodeClaimName(workspaceObj)
// Determine the type of the input object and extract relevant fields
instanceType, namespace, name, labelSelector, err := resources.ExtractObjFields(obj)
if err != nil {
klog.Error(err)
return nil
}

nodeClaimName := GenerateNodeClaimName(obj)

nodeClaimLabels := map[string]string{
LabelNodePool: KaitoNodePoolName, // Fake nodepool name to prevent Karpenter from scaling up.
kaitov1alpha1.LabelWorkspaceName: workspaceObj.Name,
kaitov1alpha1.LabelWorkspaceNamespace: workspaceObj.Namespace,
kaitov1alpha1.LabelWorkspaceName: name,
kaitov1alpha1.LabelWorkspaceNamespace: namespace,
}
if workspaceObj.Resource.LabelSelector != nil &&
len(workspaceObj.Resource.LabelSelector.MatchLabels) != 0 {
nodeClaimLabels = lo.Assign(nodeClaimLabels, workspaceObj.Resource.LabelSelector.MatchLabels)
if labelSelector != nil && len(labelSelector.MatchLabels) != 0 {
nodeClaimLabels = lo.Assign(nodeClaimLabels, labelSelector.MatchLabels)
}

nodeClaimAnnotations := map[string]string{
Expand All @@ -73,7 +80,7 @@ func GenerateNodeClaimManifest(ctx context.Context, storageRequirement string, w
nodeClaimObj := &v1beta1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: nodeClaimName,
Namespace: workspaceObj.Namespace,
Namespace: namespace,
Labels: nodeClaimLabels,
Annotations: nodeClaimAnnotations,
},
Expand Down Expand Up @@ -101,7 +108,7 @@ func GenerateNodeClaimManifest(ctx context.Context, storageRequirement string, w
NodeSelectorRequirement: v1.NodeSelectorRequirement{
Key: v1.LabelInstanceTypeStable,
Operator: v1.NodeSelectorOpIn,
Values: []string{workspaceObj.Resource.InstanceType},
Values: []string{instanceType},
},
},
{
Expand All @@ -125,17 +132,23 @@ func GenerateNodeClaimManifest(ctx context.Context, storageRequirement string, w
NodeSelectorRequirement: v1.NodeSelectorRequirement{
Key: azurev1alpha2.LabelSKUName,
Operator: v1.NodeSelectorOpIn,
Values: []string{workspaceObj.Resource.InstanceType},
Values: []string{instanceType},
},
}
nodeClaimObj.Spec.Requirements = append(nodeClaimObj.Spec.Requirements, nodeSelector)
}
return nodeClaimObj
}

func GenerateNodeClaimName(workspaceObj *kaitov1alpha1.Workspace) string {
digest := sha256.Sum256([]byte(workspaceObj.Namespace + workspaceObj.Name + time.Now().
Format("2006-01-02 15:04:05.000000000"))) // We make sure the nodeClaim name is not fixed to the workspace
// GenerateNodeClaimName generates a nodeClaim name from the given workspace or RAGEngine.
func GenerateNodeClaimName(obj interface{}) string {
// Determine the type of the input object and extract relevant fields
_, namespace, name, _, err := resources.ExtractObjFields(obj)
if err != nil {
return ""
}

digest := sha256.Sum256([]byte(namespace + name + time.Now().Format("2006-01-02 15:04:05.000000000"))) // We make sure the nodeClaim name is not fixed to the object
nodeClaimName := "ws" + hex.EncodeToString(digest[0:])[0:9]
return nodeClaimName
}
Expand Down Expand Up @@ -233,27 +246,34 @@ func CreateKarpenterNodeClass(ctx context.Context, kubeClient client.Client) err
}
}

// WaitForPendingNodeClaims checks if the there are any nodeClaims in provisioning condition. If so, wait until they are ready.
func WaitForPendingNodeClaims(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace, kubeClient client.Client) error {
nodeClaims, err := ListNodeClaimByWorkspace(ctx, workspaceObj, kubeClient)
// WaitForPendingNodeClaims checks if there are any nodeClaims in provisioning condition. If so, wait until they are ready.
func WaitForPendingNodeClaims(ctx context.Context, obj interface{}, kubeClient client.Client) error {

// Determine the type of the input object and retrieve the InstanceType
instanceType, _, _, _, err := resources.ExtractObjFields(obj)
if err != nil {
return err
}

nodeClaims, err := ListNodeClaim(ctx, obj, kubeClient)
if err != nil {
return err
}

for i := range nodeClaims.Items {
// check if the nodeClaim is being created has the requested workspace instance type.
// check if the nodeClaim being created has the requested instance type
_, nodeClaimInstanceType := lo.Find(nodeClaims.Items[i].Spec.Requirements, func(requirement v1beta1.NodeSelectorRequirementWithMinValues) bool {
return requirement.Key == v1.LabelInstanceTypeStable &&
requirement.Operator == v1.NodeSelectorOpIn &&
lo.Contains(requirement.Values, workspaceObj.Resource.InstanceType)
lo.Contains(requirement.Values, instanceType)
})
if nodeClaimInstanceType {
_, found := lo.Find(nodeClaims.Items[i].GetConditions(), func(condition apis.Condition) bool {
return condition.Type == v1beta1.Initialized && condition.Status == v1.ConditionFalse
})

if found || nodeClaims.Items[i].GetConditions() == nil { // checking conditions==nil is a workaround for conditions delaying to set on the nodeClaim object.
//wait until nodeClaim is initialized.
if found || nodeClaims.Items[i].GetConditions() == nil { // Check if conditions==nil is a workaround for condition delays in setting the nodeClaim object
// wait until the nodeClaim is initialized
if err := CheckNodeClaimStatus(ctx, &nodeClaims.Items[i], kubeClient); err != nil {
return err
}
Expand All @@ -263,13 +283,26 @@ func WaitForPendingNodeClaims(ctx context.Context, workspaceObj *kaitov1alpha1.W
return nil
}

// ListNodeClaimByWorkspace list all nodeClaim objects in the cluster that are created by the workspace identified by the label.
func ListNodeClaimByWorkspace(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace, kubeClient client.Client) (*v1beta1.NodeClaimList, error) {
// ListNodeClaim lists all nodeClaim objects in the cluster that are created by the given workspace or RAGEngine.
func ListNodeClaim(ctx context.Context, obj interface{}, kubeClient client.Client) (*v1beta1.NodeClaimList, error) {
nodeClaimList := &v1beta1.NodeClaimList{}

ls := labels.Set{
kaitov1alpha1.LabelWorkspaceName: workspaceObj.Name,
kaitov1alpha1.LabelWorkspaceNamespace: workspaceObj.Namespace,
var ls labels.Set

// Build label selector based on the type of the input object
switch o := obj.(type) {
case *kaitov1alpha1.Workspace:
ls = labels.Set{
kaitov1alpha1.LabelWorkspaceName: o.Name,
kaitov1alpha1.LabelWorkspaceNamespace: o.Namespace,
}
case *kaitov1alpha1.RAGEngine:
ls = labels.Set{
kaitov1alpha1.LabelRAGEngineName: o.Name,
kaitov1alpha1.LabelRAGEngineNamespace: o.Namespace,
}
default:
return nil, fmt.Errorf("unsupported object type: %T", obj)
}

err := retry.OnError(retry.DefaultBackoff, func(err error) bool {
Expand Down
20 changes: 20 additions & 0 deletions pkg/resources/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"context"
"fmt"

kaitov1alpha1 "github.com/azure/kaito/api/v1alpha1"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -87,3 +89,21 @@ func CheckNvidiaPlugin(ctx context.Context, nodeObj *corev1.Node) bool {
}
return false
}

func ExtractObjFields(obj interface{}) (instanceType, namespace, name string, labelSelector *metav1.LabelSelector, err error) {
switch o := obj.(type) {
case *kaitov1alpha1.Workspace:
instanceType = o.Resource.InstanceType
namespace = o.Namespace
name = o.Name
labelSelector = o.Resource.LabelSelector
case *kaitov1alpha1.RAGEngine:
instanceType = o.Spec.Compute.InstanceType
namespace = o.Namespace
name = o.Name
labelSelector = o.Spec.Compute.LabelSelector
default:
err = fmt.Errorf("unsupported object type: %T", obj)
}
return
}

0 comments on commit f613bb4

Please sign in to comment.