Skip to content

Commit

Permalink
allow for retry on typically transient k8s errors in both core contro…
Browse files Browse the repository at this point in the history
…ller and resolver for remote resolution

During both sides of remote resolution (core controller and resolver) typically transient kubernetes errors were being treated as permanent knative errors and no attempts at trying to reconcile again were made, leading to failures which could be avoided.

This changes addresses that.
  • Loading branch information
khrm authored and gabemontero committed Apr 18, 2024
1 parent de643de commit fd6ea81
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 34 deletions.
3 changes: 2 additions & 1 deletion pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
tresources "github.com/tektoncd/pipeline/pkg/reconciler/taskrun/resources"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
"github.com/tektoncd/pipeline/pkg/remote"
resolutioncommon "github.com/tektoncd/pipeline/pkg/resolution/common"
resolution "github.com/tektoncd/pipeline/pkg/resolution/resource"
"github.com/tektoncd/pipeline/pkg/substitution"
"github.com/tektoncd/pipeline/pkg/trustedresources"
Expand Down Expand Up @@ -373,7 +374,7 @@ func (c *Reconciler) resolvePipelineState(
pst,
)
if err != nil {
if tresources.IsErrTransient(err) {
if resolutioncommon.IsErrTransient(err) {
return nil, err
}
if errors.Is(err, remote.ErrRequestInProgress) {
Expand Down
15 changes: 14 additions & 1 deletion pkg/reconciler/taskrun/resources/taskref.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"slices"
"strings"

v1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/tektoncd/pipeline/pkg/remote/resolution"
remoteresource "github.com/tektoncd/pipeline/pkg/resolution/resource"
"github.com/tektoncd/pipeline/pkg/trustedresources"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -323,7 +325,18 @@ func (l *LocalStepActionRefResolver) GetStepAction(ctx context.Context, name str

// IsErrTransient returns true if an error returned by GetTask/GetStepAction is retryable.
func IsErrTransient(err error) bool {
return strings.Contains(err.Error(), errEtcdLeaderChange)
switch {
case apierrors.IsConflict(err):
return true
case apierrors.IsServerTimeout(err):
return true
case apierrors.IsTimeout(err):
return true
default:
return slices.ContainsFunc([]string{errEtcdLeaderChange, context.DeadlineExceeded.Error()}, func(s string) bool {
return strings.Contains(err.Error(), s)
})
}
}

// convertClusterTaskToTask converts deprecated v1beta1 ClusterTasks to Tasks for
Expand Down
5 changes: 3 additions & 2 deletions pkg/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/tektoncd/pipeline/pkg/reconciler/taskrun/resources"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
"github.com/tektoncd/pipeline/pkg/remote"
resolutioncommon "github.com/tektoncd/pipeline/pkg/resolution/common"
resolution "github.com/tektoncd/pipeline/pkg/resolution/resource"
"github.com/tektoncd/pipeline/pkg/spire"
"github.com/tektoncd/pipeline/pkg/taskrunmetrics"
Expand Down Expand Up @@ -409,7 +410,7 @@ func (c *Reconciler) prepare(ctx context.Context, tr *v1.TaskRun) (*v1.TaskSpec,
return nil, nil, err
case err != nil:
logger.Errorf("Failed to determine Task spec to use for taskrun %s: %v", tr.Name, err)
if resources.IsErrTransient(err) {
if resolutioncommon.IsErrTransient(err) {
return nil, nil, err
}
tr.Status.MarkResourceFailed(v1.TaskRunReasonFailedResolution, err)
Expand All @@ -434,7 +435,7 @@ func (c *Reconciler) prepare(ctx context.Context, tr *v1.TaskRun) (*v1.TaskSpec,
return nil, nil, err
case err != nil:
logger.Errorf("Failed to determine StepAction to use for TaskRun %s: %v", tr.Name, err)
if resources.IsErrTransient(err) {
if resolutioncommon.IsErrTransient(err) {
return nil, nil, err
}
tr.Status.MarkResourceFailed(v1.TaskRunReasonFailedResolution, err)
Expand Down
66 changes: 37 additions & 29 deletions pkg/reconciler/taskrun/taskrun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1962,38 +1962,46 @@ spec:
Tasks: []*v1.Task{simpleTask},
ClusterTasks: []*v1beta1.ClusterTask{},
}
testAssets, cancel := getTaskRunController(t, d)
defer cancel()
c := testAssets.Controller
clients := testAssets.Clients
createServiceAccount(t, testAssets, "default", tr.Namespace)
for _, v := range []error{
errors.New("etcdserver: leader changed"),
context.DeadlineExceeded,
apierrors.NewConflict(pipeline.TaskRunResource, "", nil),
apierrors.NewServerTimeout(pipeline.TaskRunResource, "", 0),
apierrors.NewTimeoutError("", 0),
} {
testAssets, cancel := getTaskRunController(t, d)
defer cancel()
c := testAssets.Controller
clients := testAssets.Clients
createServiceAccount(t, testAssets, "default", tr.Namespace)

failingReactorActivated := true
clients.Pipeline.PrependReactor("*", "tasks", func(action ktesting.Action) (handled bool, ret runtime.Object, err error) {
return failingReactorActivated, &v1.Task{}, errors.New("etcdserver: leader changed")
})
err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tr))
if err == nil {
t.Error("Wanted a wrapped error, but got nil.")
}
if controller.IsPermanentError(err) {
t.Errorf("Unexpected permanent error %v", err)
}
failingReactorActivated := true
clients.Pipeline.PrependReactor("*", "tasks", func(action ktesting.Action) (handled bool, ret runtime.Object, err error) {
return failingReactorActivated, &v1.Task{}, v
})
err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tr))
if err == nil {
t.Error("Wanted a wrapped error, but got nil.")
}
if controller.IsPermanentError(err) {
t.Errorf("Unexpected permanent error %v", err)
}

failingReactorActivated = false
err = c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tr))
if err != nil {
if ok, _ := controller.IsRequeueKey(err); !ok {
t.Errorf("unexpected error in TaskRun reconciliation: %v", err)
failingReactorActivated = false
err = c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tr))
if err != nil {
if ok, _ := controller.IsRequeueKey(err); !ok {
t.Errorf("unexpected error in TaskRun reconciliation: %v", err)
}
}
reconciledRun, err := clients.Pipeline.TektonV1().TaskRuns("foo").Get(testAssets.Ctx, tr.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Somehow had error getting reconciled run out of fake client: %s", err)
}
condition := reconciledRun.Status.GetCondition(apis.ConditionSucceeded)
if !condition.IsUnknown() {
t.Errorf("Expected TaskRun to still be running but succeeded condition is %v", condition.Status)
}
}
reconciledRun, err := clients.Pipeline.TektonV1().TaskRuns("foo").Get(testAssets.Ctx, tr.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Somehow had error getting reconciled run out of fake client: %s", err)
}
condition := reconciledRun.Status.GetCondition(apis.ConditionSucceeded)
if !condition.IsUnknown() {
t.Errorf("Expected TaskRun to still be running but succeeded condition is %v", condition.Status)
}
}

Expand Down
27 changes: 27 additions & 0 deletions pkg/resolution/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,21 @@ limitations under the License.
package common

import (
"context"
"errors"
"fmt"
"slices"
"strings"

apierrors "k8s.io/apimachinery/pkg/api/errors"
)

// This error is defined in etcd at
// https://github.com/etcd-io/etcd/blob/5b226e0abf4100253c94bb71f47d6815877ed5a2/server/etcdserver/errors.go#L30
// TODO: If/when https://github.com/kubernetes/kubernetes/issues/106491 is addressed,
// we should stop relying on a hardcoded string.
var errEtcdLeaderChange = "etcdserver: leader changed"

// Error embeds both a short machine-readable string reason for resolution
// problems alongside the original error generated during the resolution flow.
type Error struct {
Expand Down Expand Up @@ -165,3 +176,19 @@ func ReasonError(err error) (string, error) {

return reason, resolutionError
}

// IsErrTransient returns true if an error returned by GetTask/GetStepAction is retryable.
func IsErrTransient(err error) bool {
switch {
case apierrors.IsConflict(err):
return true
case apierrors.IsServerTimeout(err):
return true
case apierrors.IsTimeout(err):
return true
default:
return slices.ContainsFunc([]string{errEtcdLeaderChange, context.DeadlineExceeded.Error()}, func(s string) bool {
return strings.Contains(err.Error(), s)
})
}
}
3 changes: 3 additions & 0 deletions pkg/resolution/resolver/framework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ func (r *Reconciler) OnError(ctx context.Context, rr *v1beta1.ResolutionRequest,
return controller.NewPermanentError(err)
}
if err != nil {
if resolutioncommon.IsErrTransient(err) {
return err
}
_ = r.MarkFailed(ctx, rr, err)
return controller.NewPermanentError(err)
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/resolution/resolver/framework/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/tektoncd/pipeline/pkg/apis/resolution/v1beta1"
ttesting "github.com/tektoncd/pipeline/pkg/reconciler/testing"
resolutioncommon "github.com/tektoncd/pipeline/pkg/resolution/common"
framework "github.com/tektoncd/pipeline/pkg/resolution/resolver/framework"
"github.com/tektoncd/pipeline/pkg/resolution/resolver/framework"
"github.com/tektoncd/pipeline/test"
"github.com/tektoncd/pipeline/test/diff"
"github.com/tektoncd/pipeline/test/names"
Expand Down Expand Up @@ -62,6 +62,7 @@ func TestReconcile(t *testing.T) {
reconcilerTimeout time.Duration
expectedStatus *v1beta1.ResolutionRequestStatus
expectedErr error
transient bool
}{
{
name: "unknown value",
Expand Down Expand Up @@ -206,6 +207,7 @@ func TestReconcile(t *testing.T) {
},
reconcilerTimeout: 1 * time.Second,
expectedErr: errors.New("context deadline exceeded"),
transient: true,
},
}

Expand All @@ -232,6 +234,9 @@ func TestReconcile(t *testing.T) {
if tc.expectedErr.Error() != err.Error() {
t.Fatalf("expected to get error %v, but got %v", tc.expectedErr, err)
}
if tc.transient && controller.IsPermanentError(err) {
t.Fatalf("exepected error to not be wrapped as permanent %v", err)
}
} else {
if err != nil {
if ok, _ := controller.IsRequeueKey(err); !ok {
Expand Down

0 comments on commit fd6ea81

Please sign in to comment.