Skip to content

Commit

Permalink
feat: Add workspace status to present the tuning job status (#529)
Browse files Browse the repository at this point in the history
This PR made a few changes in terms of the workspace status update.
- Since workspace CRD now supports both inference and tuning, the
`WorkspaceConditionTypeReady` status is renamed to
`WorkspaceConditionTypeSucceeded`, which can be used for both use cases
to represent the overall status.
- Add a `WorkspaceConditionTypeTuningJobStatus` to indicate that the job
pod has started without error.
- Since the controller has a watcher for the job, once job has changed
its status to "succeeded", the reconciler will update the
`WorkspaceConditionTypeSucceeded` status to true.
- The CRD printed columns are also changed accordingly.

---------

Signed-off-by: Fei Guo <vrgf2003@gmail.com>
Co-authored-by: Ishaan Sehgal <ishaanforthewin@gmail.com>
  • Loading branch information
Fei-Guo and ishaansehgal99 authored Jul 21, 2024
1 parent 4fdecc2 commit 4d5cd2d
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 21 deletions.
11 changes: 8 additions & 3 deletions api/v1alpha1/workspace_condition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ const (
// WorkspaceConditionTypeResourceStatus is the state when Resource has been created.
WorkspaceConditionTypeResourceStatus = ConditionType("ResourceReady")

// WorkspaceConditionTypeInferenceStatus is the state when Inference has been created.
// WorkspaceConditionTypeInferenceStatus is the state when Inference service has been ready.
WorkspaceConditionTypeInferenceStatus = ConditionType("InferenceReady")

// WorkspaceConditionTypeTuningJobStatus is the state when the tuning job starts normally.
WorkspaceConditionTypeTuningJobStatus ConditionType = ConditionType("JobStarted")

//WorkspaceConditionTypeDeleting is the Workspace state when starts to get deleted.
WorkspaceConditionTypeDeleting = ConditionType("WorkspaceDeleting")

//WorkspaceConditionTypeReady is the Workspace state that summarize all operations' state.
WorkspaceConditionTypeReady ConditionType = ConditionType("WorkspaceReady")
//WorkspaceConditionTypeSucceeded is the Workspace state that summarizes all operations' states.
//For inference, the "True" condition means the inference service is ready to serve requests.
//For fine tuning, the "True" condition means the tuning job completes successfully.
WorkspaceConditionTypeSucceeded ConditionType = ConditionType("WorkspaceSucceeded")
)
3 changes: 2 additions & 1 deletion api/v1alpha1/workspace_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ type WorkspaceStatus struct {
// +kubebuilder:printcolumn:name="Instance",type="string",JSONPath=".resource.instanceType",description=""
// +kubebuilder:printcolumn:name="ResourceReady",type="string",JSONPath=".status.conditions[?(@.type==\"ResourceReady\")].status",description=""
// +kubebuilder:printcolumn:name="InferenceReady",type="string",JSONPath=".status.conditions[?(@.type==\"InferenceReady\")].status",description=""
// +kubebuilder:printcolumn:name="WorkspaceReady",type="string",JSONPath=".status.conditions[?(@.type==\"WorkspaceReady\")].status",description=""
// +kubebuilder:printcolumn:name="JobStarted",type="string",JSONPath=".status.conditions[?(@.type==\"JobStarted\")].status",description=""
// +kubebuilder:printcolumn:name="WorkspaceSucceeded",type="string",JSONPath=".status.conditions[?(@.type==\"WorkspaceSucceeded\")].status",description=""
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp",description=""
type Workspace struct {
metav1.TypeMeta `json:",inline"`
Expand Down
7 changes: 5 additions & 2 deletions charts/kaito/workspace/crds/kaito.sh_workspaces.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ spec:
- jsonPath: .status.conditions[?(@.type=="InferenceReady")].status
name: InferenceReady
type: string
- jsonPath: .status.conditions[?(@.type=="WorkspaceReady")].status
name: WorkspaceReady
- jsonPath: .status.conditions[?(@.type=="JobStarted")].status
name: JobStarted
type: string
- jsonPath: .status.conditions[?(@.type=="WorkspaceSucceeded")].status
name: WorkspaceSucceeded
type: string
- jsonPath: .metadata.creationTimestamp
name: Age
Expand Down
7 changes: 5 additions & 2 deletions config/crd/bases/kaito.sh_workspaces.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ spec:
- jsonPath: .status.conditions[?(@.type=="InferenceReady")].status
name: InferenceReady
type: string
- jsonPath: .status.conditions[?(@.type=="WorkspaceReady")].status
name: WorkspaceReady
- jsonPath: .status.conditions[?(@.type=="JobStarted")].status
name: JobStarted
type: string
- jsonPath: .status.conditions[?(@.type=="WorkspaceSucceeded")].status
name: WorkspaceSucceeded
type: string
- jsonPath: .metadata.creationTimestamp
name: Age
Expand Down
52 changes: 42 additions & 10 deletions pkg/controllers/workspace_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (c *WorkspaceReconciler) addOrUpdateWorkspace(ctx context.Context, wObj *ka
// Read ResourceSpec
err := c.applyWorkspaceResource(ctx, wObj)
if err != nil {
if updateErr := c.updateStatusConditionIfNotMatch(ctx, wObj, kaitov1alpha1.WorkspaceConditionTypeReady, metav1.ConditionFalse,
if updateErr := c.updateStatusConditionIfNotMatch(ctx, wObj, kaitov1alpha1.WorkspaceConditionTypeSucceeded, metav1.ConditionFalse,
"workspaceFailed", err.Error()); updateErr != nil {
klog.ErrorS(updateErr, "failed to update workspace status", "workspace", klog.KObj(wObj))
return reconcile.Result{}, updateErr
Expand All @@ -117,32 +117,53 @@ func (c *WorkspaceReconciler) addOrUpdateWorkspace(ctx context.Context, wObj *ka

if wObj.Tuning != nil {
if err = c.applyTuning(ctx, wObj); err != nil {
if updateErr := c.updateStatusConditionIfNotMatch(ctx, wObj, kaitov1alpha1.WorkspaceConditionTypeSucceeded, metav1.ConditionFalse,
"workspaceFailed", err.Error()); updateErr != nil {
klog.ErrorS(updateErr, "failed to update workspace status", "workspace", klog.KObj(wObj))
return reconcile.Result{}, updateErr
}
return reconcile.Result{}, err
}
}
if wObj.Inference != nil {
// Only mark workspace succeeded when job completes.
job := &batchv1.Job{}
if err = resources.GetResource(ctx, wObj.Name, wObj.Namespace, c.Client, job); err == nil {
if job.Status.Succeeded > 0 {
if updateErr := c.updateStatusConditionIfNotMatch(ctx, wObj, kaitov1alpha1.WorkspaceConditionTypeSucceeded, metav1.ConditionTrue,
"workspaceSucceeded", "workspace succeeds"); updateErr != nil {
klog.ErrorS(err, "failed to update workspace status", "workspace", klog.KObj(wObj))
return reconcile.Result{}, err
}
} else { // The job is still running
if updateErr := c.updateStatusConditionIfNotMatch(ctx, wObj, kaitov1alpha1.WorkspaceConditionTypeSucceeded, metav1.ConditionFalse,
"workspacePending", "workspace has not completed"); updateErr != nil {
klog.ErrorS(err, "failed to update workspace status", "workspace", klog.KObj(wObj))
return reconcile.Result{}, err
}
}
}
} else if wObj.Inference != nil {
if err := c.ensureService(ctx, wObj); err != nil {
if updateErr := c.updateStatusConditionIfNotMatch(ctx, wObj, kaitov1alpha1.WorkspaceConditionTypeReady, metav1.ConditionFalse,
if updateErr := c.updateStatusConditionIfNotMatch(ctx, wObj, kaitov1alpha1.WorkspaceConditionTypeSucceeded, metav1.ConditionFalse,
"workspaceFailed", err.Error()); updateErr != nil {
klog.ErrorS(updateErr, "failed to update workspace status", "workspace", klog.KObj(wObj))
return reconcile.Result{}, updateErr
}
return reconcile.Result{}, err
}
if err = c.applyInference(ctx, wObj); err != nil {
if updateErr := c.updateStatusConditionIfNotMatch(ctx, wObj, kaitov1alpha1.WorkspaceConditionTypeReady, metav1.ConditionFalse,
if updateErr := c.updateStatusConditionIfNotMatch(ctx, wObj, kaitov1alpha1.WorkspaceConditionTypeSucceeded, metav1.ConditionFalse,
"workspaceFailed", err.Error()); updateErr != nil {
klog.ErrorS(updateErr, "failed to update workspace status", "workspace", klog.KObj(wObj))
return reconcile.Result{}, updateErr
}
return reconcile.Result{}, err
}
}

if err = c.updateStatusConditionIfNotMatch(ctx, wObj, kaitov1alpha1.WorkspaceConditionTypeReady, metav1.ConditionTrue,
"workspaceReady", "workspace is ready"); err != nil {
klog.ErrorS(err, "failed to update workspace status", "workspace", klog.KObj(wObj))
return reconcile.Result{}, err
if err = c.updateStatusConditionIfNotMatch(ctx, wObj, kaitov1alpha1.WorkspaceConditionTypeSucceeded, metav1.ConditionTrue,
"workspaceSucceeded", "workspace succeeds"); err != nil {
klog.ErrorS(err, "failed to update workspace status", "workspace", klog.KObj(wObj))
return reconcile.Result{}, err
}
}

return reconcile.Result{}, nil
Expand Down Expand Up @@ -561,6 +582,17 @@ func (c *WorkspaceReconciler) applyTuning(ctx context.Context, wObj *kaitov1alph
}()

if err != nil {
if updateErr := c.updateStatusConditionIfNotMatch(ctx, wObj, kaitov1alpha1.WorkspaceConditionTypeTuningJobStatus, metav1.ConditionFalse,
"WorkspaceTuningJobStatusFailed", err.Error()); updateErr != nil {
klog.ErrorS(updateErr, "failed to update workspace status", "workspace", klog.KObj(wObj))
return updateErr
}
return err
}

if err := c.updateStatusConditionIfNotMatch(ctx, wObj, kaitov1alpha1.WorkspaceConditionTypeTuningJobStatus, metav1.ConditionTrue,
"WorkspaceTuningJobStatusStarted", "Tuning job has started"); err != nil {
klog.ErrorS(err, "failed to update workspace status", "workspace", klog.KObj(wObj))
return err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/resources/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ func CheckResourceStatus(obj client.Object, kubeClient client.Client, timeoutDur
return nil
}
case *batchv1.Job:
klog.InfoS("checking job status", "name", k8sResource.Name, "namespace", k8sResource.Namespace, "succeeded", k8sResource.Status.Succeeded, "active", k8sResource.Status.Active, "failed", k8sResource.Status.Failed)
if k8sResource.Status.Failed == 0 {
if k8sResource.Status.Active > 0 || k8sResource.Status.Succeeded > 0 {
klog.InfoS("job status is active/succeeded", "name", k8sResource.Name)
return nil
}
default:
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/preset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func validateWorkspaceReadiness(workspaceObj *kaitov1alpha1.Workspace) {
}

_, conditionFound := lo.Find(workspaceObj.Status.Conditions, func(condition metav1.Condition) bool {
return condition.Type == string(kaitov1alpha1.WorkspaceConditionTypeReady) &&
return condition.Type == string(kaitov1alpha1.WorkspaceConditionTypeSucceeded) &&
condition.Status == metav1.ConditionTrue
})
return conditionFound
Expand Down

0 comments on commit 4d5cd2d

Please sign in to comment.