Skip to content

Commit

Permalink
update of functions related to nodeclaim and machine for reuse
Browse files Browse the repository at this point in the history
Signed-off-by: Bangqi Zhu <bangqizhu@microsoft.com>
  • Loading branch information
Bangqi Zhu committed Oct 9, 2024
1 parent 6499e62 commit f8fd204
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 48 deletions.
86 changes: 65 additions & 21 deletions pkg/machine/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,45 @@ var (
machineStatusTimeoutInterval = 240 * time.Second
)

// GenerateMachineManifest generates a machine object from the given workspace.
func GenerateMachineManifest(ctx context.Context, storageRequirement string, workspaceObj *kaitov1alpha1.Workspace) *v1alpha5.Machine {
digest := sha256.Sum256([]byte(workspaceObj.Namespace + workspaceObj.Name + time.Now().Format("2006-01-02 15:04:05.000000000"))) // We make sure the machine name is not fixed to the a workspace
// GenerateMachineManifest generates a machine object from the given workspace or RAGEngine.
func GenerateMachineManifest(ctx context.Context, storageRequirement string, obj interface{}) *v1alpha5.Machine {
var instanceType string
var namespace, name string
var labelSelector *metav1.LabelSelector

// Determine the type of the input object and extract relevant fields
switch o := obj.(type) {
case *kaitov1alpha1.Workspace:
instanceType = o.Resource.InstanceType
namespace = o.Namespace
name = o.Name
labelSelector = o.Resource.LabelSelector
case *kaitov1alpha1.RAGEngine:
instanceType = o.Spec.Compute.InstanceType
namespace = o.Namespace
name = o.Name
labelSelector = o.Spec.Compute.LabelSelector
default:
klog.Error("unsupported object type", obj)
return nil
}

digest := sha256.Sum256([]byte(namespace + name + time.Now().Format("2006-01-02 15:04:05.000000000"))) // We make sure the nodeClaim name is not fixed to the object
machineName := "ws" + hex.EncodeToString(digest[0:])[0:9]
machineLabels := map[string]string{
LabelProvisionerName: ProvisionerName,
kaitov1alpha1.LabelWorkspaceName: workspaceObj.Name,
kaitov1alpha1.LabelWorkspaceNamespace: workspaceObj.Namespace,
kaitov1alpha1.LabelWorkspaceName: name,
kaitov1alpha1.LabelWorkspaceNamespace: namespace,
}
if workspaceObj.Resource.LabelSelector != nil &&
len(workspaceObj.Resource.LabelSelector.MatchLabels) != 0 {
machineLabels = lo.Assign(machineLabels, workspaceObj.Resource.LabelSelector.MatchLabels)

if labelSelector != nil && len(labelSelector.MatchLabels) != 0 {
machineLabels = lo.Assign(machineLabels, labelSelector.MatchLabels)
}

return &v1alpha5.Machine{
ObjectMeta: metav1.ObjectMeta{
Name: machineName,
Namespace: workspaceObj.Namespace,
Namespace: namespace,
Labels: machineLabels,
},
Spec: v1alpha5.MachineSpec{
Expand All @@ -66,7 +86,7 @@ func GenerateMachineManifest(ctx context.Context, storageRequirement string, wor
{
Key: v1.LabelInstanceTypeStable,
Operator: v1.NodeSelectorOpIn,
Values: []string{workspaceObj.Resource.InstanceType},
Values: []string{instanceType},
},
{
Key: LabelProvisionerName,
Expand Down Expand Up @@ -134,26 +154,38 @@ func CreateMachine(ctx context.Context, machineObj *v1alpha5.Machine, kubeClient
}

// WaitForPendingMachines checks if the there are any machines in provisioning condition. If so, wait until they are ready.
func WaitForPendingMachines(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace, kubeClient client.Client) error {
machines, err := ListMachinesByWorkspace(ctx, workspaceObj, kubeClient)
func WaitForPendingMachines(ctx context.Context, obj interface{}, kubeClient client.Client) error {
var instanceType string

// Determine the type of the input object and retrieve the InstanceType
switch o := obj.(type) {
case *kaitov1alpha1.Workspace:
instanceType = o.Resource.InstanceType
case *kaitov1alpha1.RAGEngine:
instanceType = o.Spec.Compute.InstanceType
default:
return fmt.Errorf("unsupported object type: %T", obj)
}

machines, err := ListMachinesByWorkspace(ctx, obj, kubeClient)
if err != nil {
return err
}

for i := range machines.Items {
// check if the machine is being created has the requested workspace instance type.
// check if the machine is being created and has the requested instance type
_, machineInstanceType := lo.Find(machines.Items[i].Spec.Requirements, func(requirement v1.NodeSelectorRequirement) bool {
return requirement.Key == v1.LabelInstanceTypeStable &&
requirement.Operator == v1.NodeSelectorOpIn &&
lo.Contains(requirement.Values, workspaceObj.Resource.InstanceType)
lo.Contains(requirement.Values, instanceType)
})
if machineInstanceType {
_, found := lo.Find(machines.Items[i].GetConditions(), func(condition apis.Condition) bool {
return condition.Type == v1alpha5.MachineInitialized && condition.Status == v1.ConditionFalse
})

if found || machines.Items[i].GetConditions() == nil { // checking conditions==nil is a workaround for conditions delaying to set on the machine object.
//wait until machine is initialized.
if found || machines.Items[i].GetConditions() == nil { // Check if conditions==nil is a workaround for condition delays in setting the machine object
// wait until the machine is initialized
if err := CheckMachineStatus(ctx, &machines.Items[i], kubeClient); err != nil {
return err
}
Expand All @@ -163,13 +195,25 @@ func WaitForPendingMachines(ctx context.Context, workspaceObj *kaitov1alpha1.Wor
return nil
}

// ListMachinesByWorkspace list all machine objects in the cluster that are created by the workspace identified by the label.
func ListMachinesByWorkspace(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace, kubeClient client.Client) (*v1alpha5.MachineList, error) {
// ListMachinesByWorkspace lists all machine objects in the cluster that are created by the given workspace or RAGEngine.
func ListMachinesByWorkspace(ctx context.Context, obj interface{}, kubeClient client.Client) (*v1alpha5.MachineList, error) {
machineList := &v1alpha5.MachineList{}

ls := labels.Set{
kaitov1alpha1.LabelWorkspaceName: workspaceObj.Name,
kaitov1alpha1.LabelWorkspaceNamespace: workspaceObj.Namespace,
var ls labels.Set

switch o := obj.(type) {
case *kaitov1alpha1.Workspace:
ls = labels.Set{
kaitov1alpha1.LabelWorkspaceName: o.Name,
kaitov1alpha1.LabelWorkspaceNamespace: o.Namespace,
}
case *kaitov1alpha1.RAGEngine:
ls = labels.Set{
kaitov1alpha1.LabelRAGEngineName: o.Name,

Check failure on line 212 in pkg/machine/machine.go

View workflow job for this annotation

GitHub Actions / unit-tests

undefined: kaitov1alpha1.LabelRAGEngineName

Check failure on line 212 in pkg/machine/machine.go

View workflow job for this annotation

GitHub Actions / build

undefined: kaitov1alpha1.LabelRAGEngineName
kaitov1alpha1.LabelRAGEngineNamespace: o.Namespace,

Check failure on line 213 in pkg/machine/machine.go

View workflow job for this annotation

GitHub Actions / unit-tests

undefined: kaitov1alpha1.LabelRAGEngineNamespace

Check failure on line 213 in pkg/machine/machine.go

View workflow job for this annotation

GitHub Actions / build

undefined: kaitov1alpha1.LabelRAGEngineNamespace
}
default:
return nil, fmt.Errorf("unsupported object type: %T", obj)
}

err := retry.OnError(retry.DefaultBackoff, func(err error) bool {
Expand Down
113 changes: 86 additions & 27 deletions pkg/nodeclaim/nodeclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,39 @@ var (
nodeClaimStatusTimeoutInterval = 240 * time.Second
)

// GenerateNodeClaimManifest generates a nodeClaim object from the given workspace.
func GenerateNodeClaimManifest(ctx context.Context, storageRequirement string, workspaceObj *kaitov1alpha1.Workspace) *v1beta1.NodeClaim {
klog.InfoS("GenerateNodeClaimManifest", "workspace", klog.KObj(workspaceObj))
// GenerateNodeClaimManifest generates a nodeClaim object from the given workspace or RAGEngine.
func GenerateNodeClaimManifest(ctx context.Context, storageRequirement string, obj interface{}) *v1beta1.NodeClaim {
klog.InfoS("GenerateNodeClaimManifest", "object", obj)

var namespace, name, instanceType string
var labelSelector *metav1.LabelSelector

// Determine the type of the input object and extract relevant fields
switch o := obj.(type) {
case *kaitov1alpha1.Workspace:
namespace = o.Namespace
name = o.Name
instanceType = o.Resource.InstanceType
labelSelector = o.Resource.LabelSelector
case *kaitov1alpha1.RAGEngine:
namespace = o.Namespace
name = o.Name
instanceType = o.Spec.Compute.InstanceType
labelSelector = o.Spec.Compute.LabelSelector
default:
klog.Error("unsupported object type", obj)
return nil
}

nodeClaimName := GenerateNodeClaimName(workspaceObj)
nodeClaimName := GenerateNodeClaimName(obj)

nodeClaimLabels := map[string]string{
LabelNodePool: KaitoNodePoolName, // Fake nodepool name to prevent Karpenter from scaling up.
kaitov1alpha1.LabelWorkspaceName: workspaceObj.Name,
kaitov1alpha1.LabelWorkspaceNamespace: workspaceObj.Namespace,
kaitov1alpha1.LabelWorkspaceName: name,
kaitov1alpha1.LabelWorkspaceNamespace: namespace,
}
if workspaceObj.Resource.LabelSelector != nil &&
len(workspaceObj.Resource.LabelSelector.MatchLabels) != 0 {
nodeClaimLabels = lo.Assign(nodeClaimLabels, workspaceObj.Resource.LabelSelector.MatchLabels)
if labelSelector != nil && len(labelSelector.MatchLabels) != 0 {
nodeClaimLabels = lo.Assign(nodeClaimLabels, labelSelector.MatchLabels)
}

nodeClaimAnnotations := map[string]string{
Expand All @@ -73,7 +92,7 @@ func GenerateNodeClaimManifest(ctx context.Context, storageRequirement string, w
nodeClaimObj := &v1beta1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: nodeClaimName,
Namespace: workspaceObj.Namespace,
Namespace: namespace,
Labels: nodeClaimLabels,
Annotations: nodeClaimAnnotations,
},
Expand Down Expand Up @@ -101,7 +120,7 @@ func GenerateNodeClaimManifest(ctx context.Context, storageRequirement string, w
NodeSelectorRequirement: v1.NodeSelectorRequirement{
Key: v1.LabelInstanceTypeStable,
Operator: v1.NodeSelectorOpIn,
Values: []string{workspaceObj.Resource.InstanceType},
Values: []string{instanceType},
},
},
{
Expand All @@ -125,17 +144,32 @@ func GenerateNodeClaimManifest(ctx context.Context, storageRequirement string, w
NodeSelectorRequirement: v1.NodeSelectorRequirement{
Key: azurev1alpha2.LabelSKUName,
Operator: v1.NodeSelectorOpIn,
Values: []string{workspaceObj.Resource.InstanceType},
Values: []string{instanceType},
},
}
nodeClaimObj.Spec.Requirements = append(nodeClaimObj.Spec.Requirements, nodeSelector)
}
return nodeClaimObj
}

func GenerateNodeClaimName(workspaceObj *kaitov1alpha1.Workspace) string {
digest := sha256.Sum256([]byte(workspaceObj.Namespace + workspaceObj.Name + time.Now().
Format("2006-01-02 15:04:05.000000000"))) // We make sure the nodeClaim name is not fixed to the workspace
// GenerateNodeClaimName generates a nodeClaim name from the given workspace or RAGEngine.
func GenerateNodeClaimName(obj interface{}) string {
var namespace, name string

// Determine the type of the input object and extract relevant fields
switch o := obj.(type) {
case *kaitov1alpha1.Workspace:
namespace = o.Namespace
name = o.Name
case *kaitov1alpha1.RAGEngine:
namespace = o.Namespace
name = o.Name
default:
// Handle unsupported object types
return ""
}

digest := sha256.Sum256([]byte(namespace + name + time.Now().Format("2006-01-02 15:04:05.000000000"))) // We make sure the nodeClaim name is not fixed to the object
nodeClaimName := "ws" + hex.EncodeToString(digest[0:])[0:9]
return nodeClaimName
}
Expand Down Expand Up @@ -233,27 +267,39 @@ func CreateKarpenterNodeClass(ctx context.Context, kubeClient client.Client) err
}
}

// WaitForPendingNodeClaims checks if the there are any nodeClaims in provisioning condition. If so, wait until they are ready.
func WaitForPendingNodeClaims(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace, kubeClient client.Client) error {
nodeClaims, err := ListNodeClaimByWorkspace(ctx, workspaceObj, kubeClient)
// WaitForPendingNodeClaims checks if there are any nodeClaims in provisioning condition. If so, wait until they are ready.
func WaitForPendingNodeClaims(ctx context.Context, obj interface{}, kubeClient client.Client) error {
var instanceType string

// Determine the type of the input object and retrieve the InstanceType
switch o := obj.(type) {
case *kaitov1alpha1.Workspace:
instanceType = o.Resource.InstanceType
case *kaitov1alpha1.RAGEngine:
instanceType = o.Spec.Compute.InstanceType
default:
return fmt.Errorf("unsupported object type: %T", obj)
}

nodeClaims, err := ListNodeClaimByWorkspace(ctx, obj, kubeClient)
if err != nil {
return err
}

for i := range nodeClaims.Items {
// check if the nodeClaim is being created has the requested workspace instance type.
// check if the nodeClaim being created has the requested instance type
_, nodeClaimInstanceType := lo.Find(nodeClaims.Items[i].Spec.Requirements, func(requirement v1beta1.NodeSelectorRequirementWithMinValues) bool {
return requirement.Key == v1.LabelInstanceTypeStable &&
requirement.Operator == v1.NodeSelectorOpIn &&
lo.Contains(requirement.Values, workspaceObj.Resource.InstanceType)
lo.Contains(requirement.Values, instanceType)
})
if nodeClaimInstanceType {
_, found := lo.Find(nodeClaims.Items[i].GetConditions(), func(condition apis.Condition) bool {
return condition.Type == v1beta1.Initialized && condition.Status == v1.ConditionFalse
})

if found || nodeClaims.Items[i].GetConditions() == nil { // checking conditions==nil is a workaround for conditions delaying to set on the nodeClaim object.
//wait until nodeClaim is initialized.
if found || nodeClaims.Items[i].GetConditions() == nil { // Check if conditions==nil is a workaround for condition delays in setting the nodeClaim object
// wait until the nodeClaim is initialized
if err := CheckNodeClaimStatus(ctx, &nodeClaims.Items[i], kubeClient); err != nil {
return err
}
Expand All @@ -263,13 +309,26 @@ func WaitForPendingNodeClaims(ctx context.Context, workspaceObj *kaitov1alpha1.W
return nil
}

// ListNodeClaimByWorkspace list all nodeClaim objects in the cluster that are created by the workspace identified by the label.
func ListNodeClaimByWorkspace(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace, kubeClient client.Client) (*v1beta1.NodeClaimList, error) {
// ListNodeClaimByWorkspace lists all nodeClaim objects in the cluster that are created by the given workspace or RAGEngine.
func ListNodeClaimByWorkspace(ctx context.Context, obj interface{}, kubeClient client.Client) (*v1beta1.NodeClaimList, error) {
nodeClaimList := &v1beta1.NodeClaimList{}

ls := labels.Set{
kaitov1alpha1.LabelWorkspaceName: workspaceObj.Name,
kaitov1alpha1.LabelWorkspaceNamespace: workspaceObj.Namespace,
var ls labels.Set

// Build label selector based on the type of the input object
switch o := obj.(type) {
case *kaitov1alpha1.Workspace:
ls = labels.Set{
kaitov1alpha1.LabelWorkspaceName: o.Name,
kaitov1alpha1.LabelWorkspaceNamespace: o.Namespace,
}
case *kaitov1alpha1.RAGEngine:
ls = labels.Set{
kaitov1alpha1.LabelRAGEngineName: o.Name,

Check failure on line 327 in pkg/nodeclaim/nodeclaim.go

View workflow job for this annotation

GitHub Actions / unit-tests

undefined: kaitov1alpha1.LabelRAGEngineName

Check failure on line 327 in pkg/nodeclaim/nodeclaim.go

View workflow job for this annotation

GitHub Actions / build

undefined: kaitov1alpha1.LabelRAGEngineName
kaitov1alpha1.LabelRAGEngineNamespace: o.Namespace,

Check failure on line 328 in pkg/nodeclaim/nodeclaim.go

View workflow job for this annotation

GitHub Actions / unit-tests

undefined: kaitov1alpha1.LabelRAGEngineNamespace

Check failure on line 328 in pkg/nodeclaim/nodeclaim.go

View workflow job for this annotation

GitHub Actions / build

undefined: kaitov1alpha1.LabelRAGEngineNamespace
}
default:
return nil, fmt.Errorf("unsupported object type: %T", obj)
}

err := retry.OnError(retry.DefaultBackoff, func(err error) bool {
Expand Down

0 comments on commit f8fd204

Please sign in to comment.