Skip to content

Commit

Permalink
Add check for existing Argo Workflows (opendatahub-io#955)
Browse files Browse the repository at this point in the history
* Add check for existing Argo Workflows

* Move resource cleanup before DSC creation

* Add separate workflow for upgrades and installs

* Remove conditions that are not required

* Remove check for DSP component

* Add more specific error comparison

* Fix linter

* Add Error phase

* Fix linter
  • Loading branch information
VaishnaviHire authored Apr 12, 2024
1 parent 32cb14d commit 7389fcb
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 12 deletions.
32 changes: 29 additions & 3 deletions components/datasciencepipelines/datasciencepipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,23 @@ import (

"github.com/go-logr/logr"
operatorv1 "github.com/openshift/api/operator/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

dsciv1 "github.com/opendatahub-io/opendatahub-operator/v2/apis/dscinitialization/v1"
"github.com/opendatahub-io/opendatahub-operator/v2/components"
"github.com/opendatahub-io/opendatahub-operator/v2/pkg/deploy"
"github.com/opendatahub-io/opendatahub-operator/v2/pkg/metadata/labels"
"github.com/opendatahub-io/opendatahub-operator/v2/pkg/monitoring"
)

var (
ComponentName = "data-science-pipelines-operator"
Path = deploy.DefaultManifestPath + "/" + ComponentName + "/base"
OverlayPath = deploy.DefaultManifestPath + "/" + ComponentName + "/overlays"
ComponentName = "data-science-pipelines-operator"
Path = deploy.DefaultManifestPath + "/" + ComponentName + "/base"
OverlayPath = deploy.DefaultManifestPath + "/" + ComponentName + "/overlays"
ArgoWorkflowCRD = "workflows.argoproj.io"
)

// Verifies that Dashboard implements ComponentInterface.
Expand Down Expand Up @@ -104,6 +108,10 @@ func (d *DataSciencePipelines) ReconcileComponent(ctx context.Context,
return fmt.Errorf("failed to update image from %s : %w", Path, err)
}
}
// Check for existing Argo Workflows
if err := UnmanagedArgoWorkFlowExists(ctx, cli); err != nil {
return err
}
}

// new overlay
Expand Down Expand Up @@ -141,3 +149,21 @@ func (d *DataSciencePipelines) ReconcileComponent(ctx context.Context,

return nil
}

func UnmanagedArgoWorkFlowExists(ctx context.Context,
cli client.Client) error {
workflowCRD := &apiextensionsv1.CustomResourceDefinition{}
if err := cli.Get(ctx, client.ObjectKey{Name: ArgoWorkflowCRD}, workflowCRD); err != nil {
if apierrs.IsNotFound(err) {
return nil
}
return fmt.Errorf("failed to get existing Workflow CRD : %w", err)
}
// Verify if existing workflow is deployed by ODH
_, odhLabelExists := workflowCRD.Labels[labels.ODH.Component(ComponentName)]
if odhLabelExists {
return nil
}
return fmt.Errorf(" %v CRD already exists but not deployed by this operator. Remove existing Argo workflows or set datasciencepipelines to Removed to proceed ",
ArgoWorkflowCRD)
}
41 changes: 38 additions & 3 deletions controllers/datasciencecluster/datasciencecluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
corev1 "k8s.io/api/core/v1"
netv1 "k8s.io/api/networking/v1"
authv1 "k8s.io/api/rbac/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -51,6 +52,7 @@ import (
dsc "github.com/opendatahub-io/opendatahub-operator/v2/apis/datasciencecluster/v1"
dsci "github.com/opendatahub-io/opendatahub-operator/v2/apis/dscinitialization/v1"
"github.com/opendatahub-io/opendatahub-operator/v2/components"
"github.com/opendatahub-io/opendatahub-operator/v2/components/datasciencepipelines"
"github.com/opendatahub-io/opendatahub-operator/v2/controllers/status"
"github.com/opendatahub-io/opendatahub-operator/v2/pkg/cluster"
"github.com/opendatahub-io/opendatahub-operator/v2/pkg/upgrade"
Expand All @@ -77,7 +79,7 @@ const (

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *DataScienceClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { //nolint:gocyclo
func (r *DataScienceClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { //nolint:maintidx,gocyclo
r.Log.Info("Reconciling DataScienceCluster resources", "Request.Name", req.Name)

instances := &dsc.DataScienceClusterList{}
Expand Down Expand Up @@ -192,6 +194,21 @@ func (r *DataScienceClusterReconciler) Reconcile(ctx context.Context, req ctrl.R

return ctrl.Result{}, nil
}
// Check preconditions if this is an upgrade
if instance.Status.Phase == status.PhaseReady {
// Check for existence of Argo Workflows if DSP is
if instance.Status.InstalledComponents[datasciencepipelines.ComponentName] {
if err := datasciencepipelines.UnmanagedArgoWorkFlowExists(ctx, r.Client); err != nil {
message := fmt.Sprintf("Failed upgrade: %v ", err.Error())
_, err = status.UpdateWithRetry(ctx, r.Client, instance, func(saved *dsc.DataScienceCluster) {
status.SetExistingArgoCondition(&saved.Status.Conditions, status.ArgoWorkflowExist, message)
status.SetErrorCondition(&saved.Status.Conditions, status.ArgoWorkflowExist, message)
saved.Status.Phase = status.PhaseError
})
return ctrl.Result{}, err
}
}
}

// Start reconciling
if instance.Status.Conditions == nil {
Expand Down Expand Up @@ -281,12 +298,15 @@ func (r *DataScienceClusterReconciler) reconcileSubComponent(ctx context.Context
instance = r.reportError(err, instance, "failed to reconcile "+componentName+" on DataScienceCluster")
instance, _ = status.UpdateWithRetry(ctx, r.Client, instance, func(saved *dsc.DataScienceCluster) {
if enabled {
status.SetComponentCondition(&saved.Status.Conditions, componentName, status.ReconcileFailed, fmt.Sprintf("Component reconciliation failed: %v", err), corev1.ConditionFalse)
if strings.Contains(err.Error(), datasciencepipelines.ArgoWorkflowCRD+" CRD already exists") {
status.SetExistingArgoCondition(&saved.Status.Conditions, status.ArgoWorkflowExist, fmt.Sprintf("Component update failed: %v", err))
} else {
status.SetComponentCondition(&saved.Status.Conditions, componentName, status.ReconcileFailed, fmt.Sprintf("Component reconciliation failed: %v", err), corev1.ConditionFalse)
}
} else {
status.SetComponentCondition(&saved.Status.Conditions, componentName, status.ReconcileFailed, fmt.Sprintf("Component removal failed: %v", err), corev1.ConditionFalse)
}
})

return instance, err
}
// reconciliation succeeded: update status accordingly
Expand Down Expand Up @@ -407,6 +427,8 @@ func (r *DataScienceClusterReconciler) SetupWithManager(mgr ctrl.Manager) error
Owns(&corev1.ServiceAccount{}, builder.WithPredicates(saPredicates)).
Watches(&source.Kind{Type: &dsci.DSCInitialization{}}, handler.EnqueueRequestsFromMapFunc(r.watchDataScienceClusterResources)).
Watches(&source.Kind{Type: &corev1.ConfigMap{}}, handler.EnqueueRequestsFromMapFunc(r.watchDataScienceClusterResources), builder.WithPredicates(configMapPredicates)).
Watches(&source.Kind{Type: &apiextensionsv1.CustomResourceDefinition{}}, handler.EnqueueRequestsFromMapFunc(r.watchDataScienceClusterResources),
builder.WithPredicates(argoWorkflowCRDPredicates)).
// this predicates prevents meaningless reconciliations from being triggered
WithEventFilter(predicate.Or(predicate.GenerationChangedPredicate{}, predicate.LabelChangedPredicate{})).
Complete(r)
Expand Down Expand Up @@ -446,3 +468,16 @@ func (r *DataScienceClusterReconciler) watchDataScienceClusterResources(a client

return nil
}

// argoWorkflowCRDPredicates filters the delete events to trigger reconcile when Argo Workflow CRD is deleted.
var argoWorkflowCRDPredicates = predicate.Funcs{
DeleteFunc: func(e event.DeleteEvent) bool {
if e.Object.GetName() == datasciencepipelines.ArgoWorkflowCRD {
labels := e.Object.GetLabels()
if _, ok := labels["app.opendatahub.io/"+datasciencepipelines.ComponentName]; !ok {
return true
}
}
return false
},
}
18 changes: 17 additions & 1 deletion controllers/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ package status
import (
conditionsv1 "github.com/openshift/custom-resource-status/conditions/v1"
corev1 "k8s.io/api/core/v1"

"github.com/opendatahub-io/opendatahub-operator/v2/components/datasciencepipelines"
)

// These constants represent the overall Phase as used by .Status.Phase.
Expand Down Expand Up @@ -68,13 +70,15 @@ const (
const (
CapabilityServiceMesh conditionsv1.ConditionType = "CapabilityServiceMesh"
CapabilityServiceMeshAuthorization conditionsv1.ConditionType = "CapabilityServiceMeshAuthorization"
CapabilityDSPv2Argo conditionsv1.ConditionType = "CapabilityDSPv2Argo"
)

const (
MissingOperatorReason string = "MissingOperator"
ConfiguredReason string = "Configured"
RemovedReason string = "Removed"
CapabilityFailed string = "CapabilityFailed"
ArgoWorkflowExist string = "ArgoWorkflowExist"
)

const (
Expand Down Expand Up @@ -145,7 +149,7 @@ func SetErrorCondition(conditions *[]conditionsv1.Condition, reason string, mess
})
conditionsv1.SetStatusCondition(conditions, conditionsv1.Condition{
Type: conditionsv1.ConditionUpgradeable,
Status: corev1.ConditionUnknown,
Status: corev1.ConditionFalse,
Reason: reason,
Message: message,
})
Expand Down Expand Up @@ -184,6 +188,7 @@ func SetCompleteCondition(conditions *[]conditionsv1.Condition, reason string, m
Reason: reason,
Message: message,
})
conditionsv1.RemoveStatusCondition(conditions, CapabilityDSPv2Argo)
}

// SetComponentCondition appends Condition Type with const ReadySuffix for given component
Expand All @@ -203,3 +208,14 @@ func RemoveComponentCondition(conditions *[]conditionsv1.Condition, component st
condType := component + ReadySuffix
conditionsv1.RemoveStatusCondition(conditions, conditionsv1.ConditionType(condType))
}

func SetExistingArgoCondition(conditions *[]conditionsv1.Condition, reason, message string) {
conditionsv1.SetStatusCondition(conditions, conditionsv1.Condition{
Type: CapabilityDSPv2Argo,
Status: corev1.ConditionFalse,
Reason: reason,
Message: message,
})

SetComponentCondition(conditions, datasciencepipelines.ComponentName, ReconcileFailed, message, corev1.ConditionFalse)
}
10 changes: 5 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,6 @@ func main() { //nolint:funlen
}
}

// Apply update from legacy operator
if err = upgrade.UpdateFromLegacyVersion(setupClient, platform, dscApplicationsNamespace, dscMonitoringNamespace); err != nil {
setupLog.Error(err, "unable to update from legacy operator version")
}

var cleanExistingResourceFunc manager.RunnableFunc = func(ctx context.Context) error {
if err = upgrade.CleanupExistingResource(ctx, setupClient, platform, dscApplicationsNamespace, dscMonitoringNamespace); err != nil {
setupLog.Error(err, "unable to perform cleanup")
Expand All @@ -240,6 +235,11 @@ func main() { //nolint:funlen
setupLog.Error(err, "error remove deprecated resources from previous version")
}

// Apply update from legacy operator
if err = upgrade.UpdateFromLegacyVersion(setupClient, platform, dscApplicationsNamespace, dscMonitoringNamespace); err != nil {
setupLog.Error(err, "unable to update from legacy operator version")
}

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
Expand Down

0 comments on commit 7389fcb

Please sign in to comment.