diff --git a/flyteadmin/go.mod b/flyteadmin/go.mod index b33f04b857..10fdaccff7 100644 --- a/flyteadmin/go.mod +++ b/flyteadmin/go.mod @@ -108,6 +108,7 @@ require ( github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.22.3 // indirect + github.com/go-redis/redis v6.15.7+incompatible // indirect github.com/go-sql-driver/mysql v1.6.0 // indirect github.com/go-test/deep v1.0.7 // indirect github.com/goccy/go-json v0.10.2 // indirect @@ -155,6 +156,7 @@ require ( github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/ncw/swift v1.0.53 // indirect + github.com/nxadm/tail v1.4.11 // indirect github.com/ory/go-acc v0.2.6 // indirect github.com/ory/go-convenience v0.1.0 // indirect github.com/ory/viper v1.7.5 // indirect @@ -201,6 +203,7 @@ require ( golang.org/x/term v0.27.0 // indirect golang.org/x/text v0.21.0 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect + gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect @@ -214,6 +217,7 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.28.4 // indirect + k8s.io/component-base v0.28.4 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/yaml v1.3.0 // indirect ) diff --git a/flyteadmin/go.sum b/flyteadmin/go.sum index b2af14d8cf..8b48761ab9 100644 --- a/flyteadmin/go.sum +++ b/flyteadmin/go.sum @@ -340,6 +340,8 @@ github.com/go-openapi/validate v0.18.0/go.mod h1:Uh4HdOzKt19xGIGm1qHf/ofbX1YQ4Y+ github.com/go-openapi/validate v0.19.2/go.mod h1:1tRCw7m3jtI8eNWEEliiAqUIcBztB2KDnRCRMUi7GTA= github.com/go-openapi/validate v0.19.3/go.mod h1:90Vh6jjkTn+OT1Eefm0ZixWNFjhtOH7vS9k0lo6zwJo= github.com/go-openapi/validate v0.19.10/go.mod h1:RKEZTUWDkxKQxN2jDT7ZnZi2bhZlbNMAuKvKB+IaGx8= +github.com/go-redis/redis v6.15.7+incompatible h1:3skhDh95XQMpnqeqNftPkQD9jL9e5e36z/1SUm6dy1U= +github.com/go-redis/redis v6.15.7+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= @@ -1013,6 +1015,8 @@ github.com/ncw/swift v1.0.53/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d/go.mod h1:YUTz3bUH2ZwIWBy3CJBeOBEugqcmXREj14T+iG/4k4U= +github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY= +github.com/nxadm/tail v1.4.11/go.mod h1:OTaG3NK980DZzxbRq6lEuzgU+mug70nY11sMd4JXXHc= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/oleiade/reflections v1.0.0/go.mod h1:RbATFBbKYkVdqmSFtx13Bb/tVhR0lgOBXunWTZKeL4w= github.com/oleiade/reflections v1.0.1 h1:D1XO3LVEYroYskEsoSiGItp9RUxG6jWnCVvrqH0HHQM= @@ -1024,6 +1028,7 @@ github.com/onsi/ginkgo v1.9.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= github.com/onsi/ginkgo/v2 v2.11.0 h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU= github.com/onsi/ginkgo/v2 v2.11.0/go.mod h1:ZhrRA5XmEE3x3rhlzamx/JJvujdZoJ2uvgI7kR0iZvM= github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= @@ -1971,6 +1976,7 @@ gopkg.in/square/go-jose.v2 v2.2.2/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76 gopkg.in/square/go-jose.v2 v2.5.2-0.20210529014059-a5c7eec3c614/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= gopkg.in/square/go-jose.v2 v2.6.0 h1:NGk74WTnPKBNUhNzQX7PYcTLUjoq7mzKk2OKbvwk2iI= gopkg.in/square/go-jose.v2 v2.6.0/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19/go.mod h1:o4V0GXN9/CAmCsvJ0oXYZvrZOe7syiDZSN1GWGZTGzc= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= diff --git a/flyteadmin/pkg/workflowengine/impl/prepare_execution.go b/flyteadmin/pkg/workflowengine/impl/prepare_execution.go index 70afadbd7b..7432a08ea6 100644 --- a/flyteadmin/pkg/workflowengine/impl/prepare_execution.go +++ b/flyteadmin/pkg/workflowengine/impl/prepare_execution.go @@ -4,12 +4,14 @@ import ( "github.com/golang/protobuf/proto" "google.golang.org/grpc/codes" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "github.com/flyteorg/flyte/flyteadmin/pkg/errors" "github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + "github.com/flyteorg/flyte/flytepropeller/pkg/controller" ) func addMapValues(overrides map[string]string, defaultValues map[string]string) map[string]string { @@ -130,7 +132,7 @@ func PrepareFlyteWorkflow(data interfaces.ExecutionData, flyteWorkflow *v1alpha1 flyteWorkflow.AcceptedAt = &acceptAtWrapper // Add finalizer - flyteWorkflow.Finalizers = append(flyteWorkflow.Finalizers, "flyte-finalizer") + _ = controllerutil.AddFinalizer(flyteWorkflow, controller.Finalizer) // add permissions from auth and security context. Adding permissions from auth would be removed once all clients // have migrated over to security context diff --git a/flyteadmin/pkg/workflowengine/impl/prepare_execution_test.go b/flyteadmin/pkg/workflowengine/impl/prepare_execution_test.go index 58d5fa3934..2a6ff2008c 100644 --- a/flyteadmin/pkg/workflowengine/impl/prepare_execution_test.go +++ b/flyteadmin/pkg/workflowengine/impl/prepare_execution_test.go @@ -14,6 +14,7 @@ import ( "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + "github.com/flyteorg/flyte/flytepropeller/pkg/controller" ) const testRole = "role" @@ -254,5 +255,5 @@ func TestPrepareFlyteWorkflow(t *testing.T) { OutputLocationPrefix: "s3://bucket/key", }, }) - assert.Equal(t, flyteWorkflow.Finalizers, []string{"flyte-finalizer"}) + assert.Equal(t, flyteWorkflow.Finalizers, []string{controller.Finalizer}) } diff --git a/flyteplugins/go/tasks/plugins/array/k8s/subtask.go b/flyteplugins/go/tasks/plugins/array/k8s/subtask.go index cb1a2c0b5f..bdc1c716b7 100644 --- a/flyteplugins/go/tasks/plugins/array/k8s/subtask.go +++ b/flyteplugins/go/tasks/plugins/array/k8s/subtask.go @@ -13,6 +13,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8stypes "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "github.com/flyteorg/flyte/flyteplugins/go/tasks/errors" pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core" @@ -30,8 +31,11 @@ const ( ErrBuildPodTemplate stdErrors.ErrorCode = "POD_TEMPLATE_FAILED" ErrReplaceCmdTemplate stdErrors.ErrorCode = "CMD_TEMPLATE_FAILED" FlyteK8sArrayIndexVarName string = "FLYTE_K8S_ARRAY_INDEX" - finalizer string = "flyte/array" - JobIndexVarName string = "BATCH_JOB_ARRAY_INDEX_VAR_NAME" + finalizer string = "flyte.org/finalizer-array" + // Old non-domain-qualified finalizer for backwards compatibility + // This should eventually be removed + oldFinalizer string = "flyte/array" + JobIndexVarName string = "BATCH_JOB_ARRAY_INDEX_VAR_NAME" ) var ( @@ -69,8 +73,7 @@ func addMetadata(stCtx SubTaskExecutionContext, cfg *Config, k8sPluginCfg *confi } if k8sPluginCfg.InjectFinalizer { - f := append(pod.GetFinalizers(), finalizer) - pod.SetFinalizers(f) + _ = controllerutil.AddFinalizer(pod, finalizer) } if len(cfg.DefaultScheduler) > 0 { @@ -134,7 +137,7 @@ func abortSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Confi } if err != nil && !isK8sObjectNotExists(err) { - logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v/%v. Error: %v", + logger.Warningf(ctx, "Failed to clear finalizer for Resource with name: %v/%v. Error: %v", resourceToFinalize.GetNamespace(), resourceToFinalize.GetName(), err) return err } @@ -142,17 +145,20 @@ func abortSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Confi return nil } -// clearFinalizers removes finalizers (if they exist) from the k8s resource -func clearFinalizers(ctx context.Context, o client.Object, kubeClient pluginsCore.KubeClient) error { - if len(o.GetFinalizers()) > 0 { - o.SetFinalizers([]string{}) +// clearFinalizer removes the Flyte finalizer (if it exists) from the k8s resource +func clearFinalizer(ctx context.Context, o client.Object, kubeClient pluginsCore.KubeClient) error { + // Checking for the old finalizer too for backwards compatibility. This should eventually be removed + // Go does short-circuiting so we have to make sure both are removed + finalizerRemoved := controllerutil.RemoveFinalizer(o, finalizer) + oldFinalizerRemoved := controllerutil.RemoveFinalizer(o, oldFinalizer) + if finalizerRemoved || oldFinalizerRemoved { err := kubeClient.GetClient().Update(ctx, o) if err != nil && !isK8sObjectNotExists(err) { - logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v/%v. Error: %v", o.GetNamespace(), o.GetName(), err) + logger.Warningf(ctx, "Failed to clear finalizer for Resource with name: %v/%v. Error: %v", o.GetNamespace(), o.GetName(), err) return err } } else { - logger.Debugf(ctx, "Finalizers are already empty for Resource with name: %v/%v", o.GetNamespace(), o.GetName()) + logger.Debugf(ctx, "Finalizer is already cleared for Resource with name: %v/%v", o.GetNamespace(), o.GetName()) } return nil } @@ -211,7 +217,7 @@ func launchSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Conf } // finalizeSubtask performs operations to complete the k8s pod defined by the SubTaskExecutionContext -// and Config. These may include removing finalizers and deleting the k8s resource. +// and Config. These may include removing finalizer and deleting the k8s resource. func finalizeSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Config, kubeClient pluginsCore.KubeClient) error { errs := stdErrors.ErrorCollection{} var pod *v1.Pod @@ -231,10 +237,10 @@ func finalizeSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Co nsName = k8stypes.NamespacedName{Namespace: pod.GetNamespace(), Name: pod.GetName()} } - // In InjectFinalizer is on, it means we may have added the finalizers when we launched this resource. Attempt to - // clear them to allow the object to be deleted/garbage collected. If InjectFinalizer was turned on (through config) + // In InjectFinalizer is on, it means we may have added the finalizer when we launched this resource. Attempt to + // clear it to allow the object to be deleted/garbage collected. If InjectFinalizer was turned on (through config) // after the resource was created, we will not find any finalizers to clear and the object may have already been - // deleted at this point. Therefore, account for these cases and do not consider them errors. + // deleted at this point. Therefore, account for these cases and do not consider the errors. if k8sPluginCfg.InjectFinalizer { // Attempt to get resource from informer cache, if not found, retrieve it from API server. if err := kubeClient.GetClient().Get(ctx, nsName, pod); err != nil { @@ -250,7 +256,7 @@ func finalizeSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Co // This must happen after sending admin event. It's safe against partial failures because if the event failed, we will // simply retry in the next round. If the event succeeded but this failed, we will try again the next round to send // the same event (idempotent) and then come here again... - err := clearFinalizers(ctx, pod, kubeClient) + err := clearFinalizer(ctx, pod, kubeClient) if err != nil { errs.Append(err) } @@ -308,10 +314,10 @@ func getSubtaskPhaseInfo(ctx context.Context, stCtx SubTaskExecutionContext, cfg return pluginsCore.PhaseInfoUndefined, err } - if !phaseInfo.Phase().IsTerminal() && o.GetDeletionTimestamp() != nil { + if !phaseInfo.Phase().IsTerminal() && !o.GetDeletionTimestamp().IsZero() { // If the object has been deleted, that is, it has a deletion timestamp, but is not in a terminal state, we should // mark the task as a retryable failure. We've seen this happen when a kubelet disappears - all pods running on - // the node are marked with a deletionTimestamp, but our finalizers prevent the pod from being deleted. + // the node are marked with a deletionTimestamp, but our finalizer prevents the pod from being deleted. // This can also happen when a user deletes a Pod directly. failureReason := fmt.Sprintf("object [%s] terminated in the background, manually", nsName.String()) return pluginsCore.PhaseInfoSystemRetryableFailure("UnexpectedObjectDeletion", failureReason, nil), nil diff --git a/flytepropeller/pkg/controller/controller.go b/flytepropeller/pkg/controller/controller.go index 8d733c33a3..9cef800125 100644 --- a/flytepropeller/pkg/controller/controller.go +++ b/flytepropeller/pkg/controller/controller.go @@ -61,6 +61,11 @@ import ( ) const ( + // Finalizer is the global and domain-qualified Flyte finalizer + Finalizer = "flyte.org/finalizer" + // OldFinalizer is the old non-domain-qualified finalizer, kept for backwards compatibility + // This should eventually be removed + OldFinalizer = "flyte-finalizer" resourceLevelMonitorCycleDuration = 5 * time.Second missing = "missing" podDefaultNamespace = "flyte" diff --git a/flytepropeller/pkg/controller/finalizer.go b/flytepropeller/pkg/controller/finalizer.go deleted file mode 100644 index f1a8ba8ebd..0000000000 --- a/flytepropeller/pkg/controller/finalizer.go +++ /dev/null @@ -1,36 +0,0 @@ -package controller - -import v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - -const FinalizerKey = "flyte-finalizer" - -// NOTE: Some of these APIs are exclusive and do not compare the actual values of the finalizers. -// the intention of this module is to set only one opaque finalizer at a time. If you want to set multiple (not common) -// finalizers, use this module carefully and at your own risk! - -// Sets a new finalizer in case the finalizer is empty -func SetFinalizerIfEmpty(meta v1.Object, finalizer string) { - if !HasFinalizer(meta) { - meta.SetFinalizers([]string{finalizer}) - } -} - -// Check if the deletion timestamp is set, this is set automatically when an object is deleted -func IsDeleted(meta v1.Object) bool { - return meta.GetDeletionTimestamp() != nil -} - -// Reset all the finalizers on the object -func ResetFinalizers(meta v1.Object) { - meta.SetFinalizers([]string{}) -} - -// Currently we only compare the lengths of finalizers. If you add finalizers directly these API;'s will not work -func FinalizersIdentical(o1 v1.Object, o2 v1.Object) bool { - return len(o1.GetFinalizers()) == len(o2.GetFinalizers()) -} - -// Check if any finalizer is set -func HasFinalizer(meta v1.Object) bool { - return len(meta.GetFinalizers()) != 0 -} diff --git a/flytepropeller/pkg/controller/finalizer_test.go b/flytepropeller/pkg/controller/finalizer_test.go deleted file mode 100644 index 05401806d6..0000000000 --- a/flytepropeller/pkg/controller/finalizer_test.go +++ /dev/null @@ -1,70 +0,0 @@ -package controller - -import ( - "testing" - - "github.com/stretchr/testify/assert" - v1 "k8s.io/api/batch/v1" - v12 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -func TestFinalizersIdentical(t *testing.T) { - noFinalizer := &v1.Job{} - withFinalizer := &v1.Job{} - withFinalizer.SetFinalizers([]string{"t1"}) - - assert.True(t, FinalizersIdentical(noFinalizer, noFinalizer)) - assert.True(t, FinalizersIdentical(withFinalizer, withFinalizer)) - assert.False(t, FinalizersIdentical(noFinalizer, withFinalizer)) - withMultipleFinalizers := &v1.Job{} - withMultipleFinalizers.SetFinalizers([]string{"f1", "f2"}) - assert.False(t, FinalizersIdentical(withMultipleFinalizers, withFinalizer)) - - withDiffFinalizer := &v1.Job{} - withDiffFinalizer.SetFinalizers([]string{"f1"}) - assert.True(t, FinalizersIdentical(withFinalizer, withDiffFinalizer)) -} - -func TestIsDeleted(t *testing.T) { - noTermTS := &v1.Job{} - termedTS := &v1.Job{} - n := v12.Now() - termedTS.SetDeletionTimestamp(&n) - - assert.True(t, IsDeleted(termedTS)) - assert.False(t, IsDeleted(noTermTS)) -} - -func TestHasFinalizer(t *testing.T) { - noFinalizer := &v1.Job{} - withFinalizer := &v1.Job{} - withFinalizer.SetFinalizers([]string{"t1"}) - - assert.False(t, HasFinalizer(noFinalizer)) - assert.True(t, HasFinalizer(withFinalizer)) -} - -func TestSetFinalizerIfEmpty(t *testing.T) { - noFinalizer := &v1.Job{} - withFinalizer := &v1.Job{} - withFinalizer.SetFinalizers([]string{"t1"}) - - assert.False(t, HasFinalizer(noFinalizer)) - SetFinalizerIfEmpty(noFinalizer, "f1") - assert.True(t, HasFinalizer(noFinalizer)) - assert.Equal(t, []string{"f1"}, noFinalizer.GetFinalizers()) - - SetFinalizerIfEmpty(withFinalizer, "f1") - assert.Equal(t, []string{"t1"}, withFinalizer.GetFinalizers()) -} - -func TestResetFinalizer(t *testing.T) { - noFinalizer := &v1.Job{} - ResetFinalizers(noFinalizer) - assert.Equal(t, []string{}, noFinalizer.GetFinalizers()) - - withFinalizer := &v1.Job{} - withFinalizer.SetFinalizers([]string{"t1"}) - ResetFinalizers(withFinalizer) - assert.Equal(t, []string{}, withFinalizer.GetFinalizers()) -} diff --git a/flytepropeller/pkg/controller/handler.go b/flytepropeller/pkg/controller/handler.go index 3e9f7526fc..42749adfe4 100644 --- a/flytepropeller/pkg/controller/handler.go +++ b/flytepropeller/pkg/controller/handler.go @@ -9,6 +9,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/trace" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" @@ -103,7 +104,7 @@ func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.F ctx = contextutils.WithResourceVersion(ctx, mutableW.GetResourceVersion()) maxRetries := uint32(p.cfg.MaxWorkflowRetries) // #nosec G115 - if IsDeleted(mutableW) || (mutableW.Status.FailedAttempts > maxRetries) { + if !mutableW.GetDeletionTimestamp().IsZero() || mutableW.Status.FailedAttempts > maxRetries { var err error func() { defer func() { @@ -125,7 +126,7 @@ func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.F if !mutableW.GetExecutionStatus().IsTerminated() { var err error - SetFinalizerIfEmpty(mutableW, FinalizerKey) + _ = controllerutil.AddFinalizer(mutableW, Finalizer) SetDefinitionVersionIfEmpty(mutableW, v1alpha1.LatestWorkflowDefinitionVersion) func() { @@ -210,7 +211,9 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { } if w.GetExecutionStatus().IsTerminated() { - if HasCompletedLabel(w) && !HasFinalizer(w) { + // Checking for the old finalizer for backwards compatibility + // This should be eventually removed + if HasCompletedLabel(w) && !controllerutil.ContainsFinalizer(w, Finalizer) && !controllerutil.ContainsFinalizer(w, OldFinalizer) { logger.Debugf(ctx, "Workflow is terminated.") // This workflow had previously completed, let us ignore it return nil @@ -325,7 +328,9 @@ func (p *Propeller) streak(ctx context.Context, w *v1alpha1.FlyteWorkflow, wfClo // If the end result is a terminated workflow, we remove the labels // We add a completed label so that we can avoid polling for this workflow SetCompletedLabel(mutatedWf, time.Now()) - ResetFinalizers(mutatedWf) + _ = controllerutil.RemoveFinalizer(mutatedWf, Finalizer) + // Backwards compatibility. This should eventually be removed + _ = controllerutil.RemoveFinalizer(mutatedWf, OldFinalizer) } } @@ -387,7 +392,9 @@ func (p *Propeller) streak(ctx context.Context, w *v1alpha1.FlyteWorkflow, wfClo mutableW := w.DeepCopy() // catch potential indefinite update loop if mutatedWf.GetExecutionStatus().IsTerminated() { - ResetFinalizers(mutableW) + _ = controllerutil.RemoveFinalizer(mutableW, Finalizer) + // Backwards compatibility. This should eventually be removed + _ = controllerutil.RemoveFinalizer(mutableW, OldFinalizer) SetDefinitionVersionIfEmpty(mutableW, v1alpha1.LatestWorkflowDefinitionVersion) SetCompletedLabel(mutableW, time.Now()) msg := fmt.Sprintf("Workflow size has breached threshold. Finalized with status: %v", mutatedWf.GetExecutionStatus().GetPhase()) diff --git a/flytepropeller/pkg/controller/handler_test.go b/flytepropeller/pkg/controller/handler_test.go index 3469c1da80..a57a5fdead 100644 --- a/flytepropeller/pkg/controller/handler_test.go +++ b/flytepropeller/pkg/controller/handler_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" @@ -249,7 +250,7 @@ func TestPropeller_Handle(t *testing.T) { ObjectMeta: v1.ObjectMeta{ Name: name, Namespace: namespace, - Finalizers: []string{"f1"}, + Finalizers: []string{Finalizer, "f1"}, }, WorkflowSpec: &v1alpha1.WorkflowSpec{ ID: "w1", @@ -268,7 +269,7 @@ func TestPropeller_Handle(t *testing.T) { assert.NoError(t, err) assert.Equal(t, v1alpha1.WorkflowPhaseSucceeding, r.GetExecutionStatus().GetPhase()) assert.False(t, HasCompletedLabel(r)) - assert.Equal(t, 1, len(r.Finalizers)) + assert.Equal(t, 2, len(r.Finalizers)) }) t.Run("handlingPanics", func(t *testing.T) { @@ -276,7 +277,7 @@ func TestPropeller_Handle(t *testing.T) { ObjectMeta: v1.ObjectMeta{ Name: name, Namespace: namespace, - Finalizers: []string{"f1"}, + Finalizers: []string{Finalizer, "f1"}, }, WorkflowSpec: &v1alpha1.WorkflowSpec{ ID: "w1", @@ -294,7 +295,7 @@ func TestPropeller_Handle(t *testing.T) { assert.NoError(t, err) assert.Equal(t, v1alpha1.WorkflowPhaseSucceeding, r.GetExecutionStatus().GetPhase()) assert.False(t, HasCompletedLabel(r)) - assert.Equal(t, 1, len(r.Finalizers)) + assert.Equal(t, 2, len(r.Finalizers)) assert.Equal(t, uint32(1), r.Status.FailedAttempts) }) @@ -303,7 +304,7 @@ func TestPropeller_Handle(t *testing.T) { ObjectMeta: v1.ObjectMeta{ Name: name, Namespace: namespace, - Finalizers: []string{"f1"}, + Finalizers: []string{Finalizer, "f1"}, }, WorkflowSpec: &v1alpha1.WorkflowSpec{ ID: "w1", @@ -322,7 +323,7 @@ func TestPropeller_Handle(t *testing.T) { assert.NoError(t, err) assert.Equal(t, v1alpha1.WorkflowPhaseSucceeding, r.GetExecutionStatus().GetPhase()) assert.False(t, HasCompletedLabel(r)) - assert.Equal(t, 1, len(r.Finalizers)) + assert.Equal(t, 2, len(r.Finalizers)) }) t.Run("retriesExhaustedFinalize", func(t *testing.T) { @@ -330,7 +331,7 @@ func TestPropeller_Handle(t *testing.T) { ObjectMeta: v1.ObjectMeta{ Name: name, Namespace: namespace, - Finalizers: []string{"f1"}, + Finalizers: []string{Finalizer, "f1"}, }, WorkflowSpec: &v1alpha1.WorkflowSpec{ ID: "w1", @@ -351,7 +352,7 @@ func TestPropeller_Handle(t *testing.T) { r, err := s.Get(ctx, namespace, name) assert.NoError(t, err) assert.Equal(t, v1alpha1.WorkflowPhaseFailed, r.GetExecutionStatus().GetPhase()) - assert.Equal(t, 0, len(r.Finalizers)) + assert.NotContains(t, r.Finalizers, Finalizer) assert.True(t, HasCompletedLabel(r)) assert.True(t, abortCalled) }) @@ -362,7 +363,7 @@ func TestPropeller_Handle(t *testing.T) { ObjectMeta: v1.ObjectMeta{ Name: name, Namespace: namespace, - Finalizers: []string{"f1"}, + Finalizers: []string{Finalizer, "f1"}, DeletionTimestamp: &n, }, WorkflowSpec: &v1alpha1.WorkflowSpec{ @@ -381,7 +382,7 @@ func TestPropeller_Handle(t *testing.T) { r, err := s.Get(ctx, namespace, name) assert.NoError(t, err) assert.Equal(t, v1alpha1.WorkflowPhaseAborted, r.GetExecutionStatus().GetPhase()) - assert.Equal(t, 0, len(r.Finalizers)) + assert.NotContains(t, r.Finalizers, Finalizer) assert.True(t, HasCompletedLabel(r)) }) @@ -420,7 +421,7 @@ func TestPropeller_Handle(t *testing.T) { ObjectMeta: v1.ObjectMeta{ Name: name, Namespace: namespace, - Finalizers: []string{"f1"}, + Finalizers: []string{Finalizer, "f1"}, }, WorkflowSpec: &v1alpha1.WorkflowSpec{ ID: "w1", @@ -435,7 +436,7 @@ func TestPropeller_Handle(t *testing.T) { r, err := s.Get(ctx, namespace, name) assert.NoError(t, err) assert.Equal(t, v1alpha1.WorkflowPhaseSuccess, r.GetExecutionStatus().GetPhase()) - assert.Equal(t, 0, len(r.Finalizers)) + assert.NotContains(t, r.Finalizers, Finalizer) assert.True(t, HasCompletedLabel(r)) }) @@ -444,7 +445,7 @@ func TestPropeller_Handle(t *testing.T) { ObjectMeta: v1.ObjectMeta{ Name: name, Namespace: namespace, - Finalizers: []string{"f1"}, + Finalizers: []string{Finalizer, "f1"}, }, WorkflowSpec: &v1alpha1.WorkflowSpec{ ID: "w1", @@ -459,7 +460,7 @@ func TestPropeller_Handle(t *testing.T) { r, err := s.Get(ctx, namespace, name) assert.NoError(t, err) assert.Equal(t, v1alpha1.WorkflowPhaseFailed, r.GetExecutionStatus().GetPhase()) - assert.Equal(t, 0, len(r.Finalizers)) + assert.NotContains(t, r.Finalizers, Finalizer) assert.True(t, HasCompletedLabel(r)) }) t.Run("failOnExecutionNotFoundError", func(t *testing.T) { @@ -638,7 +639,7 @@ func TestPropeller_Handle_TurboMode(t *testing.T) { ObjectMeta: v1.ObjectMeta{ Name: name, Namespace: namespace, - Finalizers: []string{"f1"}, + Finalizers: []string{Finalizer, "f1"}, }, WorkflowSpec: &v1alpha1.WorkflowSpec{ ID: "w1", @@ -662,7 +663,7 @@ func TestPropeller_Handle_TurboMode(t *testing.T) { assert.NoError(t, err) assert.Equal(t, v1alpha1.WorkflowPhaseSucceeding, r.GetExecutionStatus().GetPhase()) assert.False(t, HasCompletedLabel(r)) - assert.Equal(t, 1, len(r.Finalizers)) + assert.Equal(t, 2, len(r.Finalizers)) }) t.Run("happy-nochange", func(t *testing.T) { @@ -843,7 +844,7 @@ func TestNewPropellerHandler_UpdateFailure(t *testing.T) { s.OnGetMatch(mock.Anything, mock.Anything, mock.Anything).Return(wf, nil) s.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(workflowstore.ErrWorkflowToLarge, "too large")).Once() s.On("Update", mock.Anything, mock.MatchedBy(func(w *v1alpha1.FlyteWorkflow) bool { - return w.Status.Phase == v1alpha1.WorkflowPhaseFailed && !HasFinalizer(w) && HasCompletedLabel(w) + return w.Status.Phase == v1alpha1.WorkflowPhaseFailed && !controllerutil.ContainsFinalizer(w, Finalizer) && HasCompletedLabel(w) }), mock.Anything).Return(nil, nil).Once() err := p.Handle(ctx, namespace, name) assert.NoError(t, err) diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go index 431824dad2..6d558f818e 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -19,6 +19,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -41,7 +42,12 @@ import ( "github.com/flyteorg/flyte/flytestdlib/promutils/labeled" ) -const finalizer = "flyte/flytek8s" +const ( + finalizer = "flyte.org/finalizer-k8s" + // Old non-domain-qualified finalizer for backwards compatibility + // This should eventually be removed + oldFinalizer = "flyte/flytek8s" +) const pluginStateVersion = 1 @@ -115,8 +121,7 @@ func (e *PluginManager) addObjectMetadata(taskCtx pluginsCore.TaskExecutionMetad } if cfg.InjectFinalizer && !e.plugin.GetProperties().DisableInjectFinalizer { - f := append(o.GetFinalizers(), finalizer) - o.SetFinalizers(f) + _ = controllerutil.AddFinalizer(o, finalizer) } if errs := validation.IsDNS1123Subdomain(o.GetName()); len(errs) > 0 { @@ -312,10 +317,10 @@ func (e *PluginManager) checkResourcePhase(ctx context.Context, tCtx pluginsCore return pluginsCore.DoTransition(p), nil } - if !p.Phase().IsTerminal() && o.GetDeletionTimestamp() != nil { + if !p.Phase().IsTerminal() && !o.GetDeletionTimestamp().IsZero() { // If the object has been deleted, that is, it has a deletion timestamp, but is not in a terminal state, we should // mark the task as a retryable failure. We've seen this happen when a kubelet disappears - all pods running on - // the node are marked with a deletionTimestamp, but our finalizers prevent the pod from being deleted. + // the node are marked with a deletionTimestamp, but our finalizer prevents the pod from being deleted. // This can also happen when a user deletes a Pod directly. failureReason := fmt.Sprintf("object [%s] terminated in the background, manually", nsName.String()) return pluginsCore.DoTransition(pluginsCore.PhaseInfoSystemRetryableFailure("UnexpectedObjectDeletion", failureReason, nil)), nil @@ -444,7 +449,7 @@ func (e PluginManager) Abort(ctx context.Context, tCtx pluginsCore.TaskExecution } if err != nil && !isK8sObjectNotExists(err) { - logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v/%v. Error: %v", + logger.Warningf(ctx, "Failed to clear finalizer for Resource with name: %v/%v. Error: %v", resourceToFinalize.GetNamespace(), resourceToFinalize.GetName(), err) return err } @@ -452,17 +457,21 @@ func (e PluginManager) Abort(ctx context.Context, tCtx pluginsCore.TaskExecution return nil } -func (e *PluginManager) clearFinalizers(ctx context.Context, o client.Object) error { - if len(o.GetFinalizers()) > 0 { - o.SetFinalizers([]string{}) +// clearFinalizer removes the Flyte finalizer (if it exists) from the k8s resource +func (e *PluginManager) clearFinalizer(ctx context.Context, o client.Object) error { + // Checking for the old finalizer too for backwards compatibility. This should eventually be removed + // Go does short-circuiting and we have to make sure both are removed + finalizerRemoved := controllerutil.RemoveFinalizer(o, finalizer) + oldFinalizerRemoved := controllerutil.RemoveFinalizer(o, oldFinalizer) + if finalizerRemoved || oldFinalizerRemoved { err := e.kubeClient.GetClient().Update(ctx, o) if err != nil && !isK8sObjectNotExists(err) { - logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v/%v. Error: %v", + logger.Warningf(ctx, "Failed to clear finalizer for Resource with name: %v/%v. Error: %v", o.GetNamespace(), o.GetName(), err) return err } } else { - logger.Debugf(ctx, "Finalizers are already empty for Resource with name: %v/%v", + logger.Debugf(ctx, "Finalizer is already cleared from Resource with name: %v/%v", o.GetNamespace(), o.GetName()) } return nil @@ -487,7 +496,7 @@ func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecu Steps: e.updateBackoffRetries, } - // Attempt to cleanup finalizers so that the object may be deleted/garbage collected. We try to clear them for all + // Attempt to cleanup finalizer so that the object may be deleted/garbage collected. We try to clear it for all // objects, regardless of whether or not InjectFinalizer is configured to handle all cases where InjectFinalizer is // enabled/disabled during object execution. var lastErr error @@ -507,14 +516,14 @@ func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecu // This must happen after sending admin event. It's safe against partial failures because if the event failed, we will // simply retry in the next round. If the event succeeded but this failed, we will try again the next round to send // the same event (idempotent) and then come here again... - if err := e.clearFinalizers(ctx, o); err != nil { + if err := e.clearFinalizer(ctx, o); err != nil { lastErr = err // retry is if there is a conflict in case the informer cache is out of sync if k8serrors.IsConflict(err) { - logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v. Error: %v. Retrying..", nsName, err) + logger.Warningf(ctx, "Failed to clear finalizer for Resource with name: %v. Error: %v. Retrying..", nsName, err) return false, nil } - logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v. Error: %v", nsName, err) + logger.Warningf(ctx, "Failed to clear finalizer for Resource with name: %v. Error: %v", nsName, err) return true, err } return true, nil diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go index 1d8d5064d9..a02c32dca1 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go @@ -987,6 +987,27 @@ func TestPluginManager_AddObjectMetadata(t *testing.T) { assert.Equal(t, 0, len(o.GetFinalizers())) }) + t.Run("Inject finalizers", func(t *testing.T) { + p := pluginsk8sMock.Plugin{} + p.OnGetProperties().Return(k8s.PluginProperties{DisableInjectFinalizer: false}) + pluginManager := PluginManager{plugin: &p} + // enable finalizer injection + cfg.InjectFinalizer = true + o := &v1.Pod{} + pluginManager.addObjectMetadata(tm, o, cfg) + assert.Equal(t, genName, o.GetName()) + // empty OwnerReference since we are ignoring + assert.Equal(t, 1, len(o.GetOwnerReferences())) + assert.Equal(t, ns, o.GetNamespace()) + assert.Equal(t, map[string]string{ + "cluster-autoscaler.kubernetes.io/safe-to-evict": "false", + "aKey": "aVal", + }, o.GetAnnotations()) + assert.Equal(t, l, o.GetLabels()) + assert.Equal(t, 1, len(o.GetFinalizers())) + assert.Contains(t, o.GetFinalizers(), finalizer) + }) + } func TestResourceManagerConstruction(t *testing.T) { @@ -1015,15 +1036,16 @@ func TestFinalize(t *testing.T) { tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted) o := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: tctx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), - Namespace: tctx.TaskExecutionMetadata().GetNamespace(), + Name: tctx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), + Namespace: tctx.TaskExecutionMetadata().GetNamespace(), + Finalizers: []string{finalizer}, }, } assert.NoError(t, fakeKubeClient.GetClient().Create(ctx, o)) p.OnBuildIdentityResource(ctx, tctx.TaskExecutionMetadata()).Return(o, nil) - pluginManager := PluginManager{plugin: &p, kubeClient: fakeKubeClient} + pluginManager := PluginManager{plugin: &p, kubeClient: fakeKubeClient, updateBackoffRetries: 5} actualO := &v1.Pod{} // Assert the object exists before calling finalize assert.NoError(t, fakeKubeClient.GetClient().Get(ctx, k8stypes.NamespacedName{ @@ -1061,7 +1083,7 @@ func TestFinalize(t *testing.T) { assert.NoError(t, fakeKubeClient.GetClient().Create(ctx, o)) p.OnBuildIdentityResource(ctx, tctx.TaskExecutionMetadata()).Return(o, nil) - pluginManager := PluginManager{plugin: &p, kubeClient: fakeKubeClient} + pluginManager := PluginManager{plugin: &p, kubeClient: fakeKubeClient, updateBackoffRetries: 5} actualO := &v1.Pod{} // Assert the object exists before calling finalize assert.NoError(t, fakeKubeClient.GetClient().Get(ctx, k8stypes.NamespacedName{ diff --git a/go.mod b/go.mod index 37e8aa4301..5f72dce3c4 100644 --- a/go.mod +++ b/go.mod @@ -144,7 +144,6 @@ require ( github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/ncw/swift v1.0.53 // indirect - github.com/nxadm/tail v1.4.11 // indirect github.com/ory/fosite v0.42.2 // indirect github.com/ory/go-acc v0.2.6 // indirect github.com/ory/go-convenience v0.1.0 // indirect