Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace snooze with NewRequeueKey #4131

Merged
merged 1 commit into from
Aug 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions pkg/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package pipelinerun

import (
"context"
"time"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
Expand All @@ -35,12 +34,10 @@ import (
resourceinformer "github.com/tektoncd/pipeline/pkg/client/resource/injection/informers/resource/v1alpha1/pipelineresource"
cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/kmeta"
"knative.dev/pkg/logging"
)

Expand Down Expand Up @@ -88,13 +85,6 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex
}
})

c.snooze = func(acc kmeta.Accessor, amnt time.Duration) {
impl.EnqueueKeyAfter(types.NamespacedName{
Namespace: acc.GetNamespace(),
Name: acc.GetName(),
}, amnt)
}

logger.Info("Setting up event handlers")
pipelineRunInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))

Expand Down
23 changes: 11 additions & 12 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,6 @@ type Reconciler struct {
cloudEventClient cloudevent.CEClient
metrics *Recorder
pvcHandler volumeclaim.PvcHandler

snooze func(kmeta.Accessor, time.Duration)
}

var (
Expand Down Expand Up @@ -221,23 +219,24 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun)
logger.Errorf("Error while syncing the pipelinerun status: %v", err.Error())
return c.finishReconcileUpdateEmitEvents(ctx, pr, before, err)
}
defer func() {
if pr.Status.StartTime == nil {
return
}
// Compute the time since the task started.
elapsed := time.Since(pr.Status.StartTime.Time)
// Snooze this resource until the timeout has elapsed.
c.snooze(pr, pr.GetTimeout(ctx)-elapsed)
}()

// Reconcile this copy of the pipelinerun and then write back any status or label
// updates regardless of whether the reconciliation errored out.
if err = c.reconcile(ctx, pr, getPipelineFunc); err != nil {
logger.Errorf("Reconcile error: %v", err.Error())
}

return c.finishReconcileUpdateEmitEvents(ctx, pr, before, err)
if err = c.finishReconcileUpdateEmitEvents(ctx, pr, before, err); err != nil {
return err
}

if pr.Status.StartTime != nil {
// Compute the time since the task started.
elapsed := time.Since(pr.Status.StartTime.Time)
// Snooze this resource until the timeout has elapsed.
return controller.NewRequeueAfter(pr.GetTimeout(ctx) - elapsed)
}
return nil
}

func (c *Reconciler) finishReconcileUpdateEmitEvents(ctx context.Context, pr *v1beta1.PipelineRun, beforeCondition *apis.Condition, previousError error) error {
Expand Down
6 changes: 5 additions & 1 deletion pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2186,7 +2186,9 @@ func TestReconcileCancelledRunFinallyFailsTaskRunCancellation(t *testing.T) {
failingReactorActivated = false

err = c.Reconciler.Reconcile(testAssets.Ctx, "foo/test-pipeline-fails-to-cancel")
if err != nil {
if err == nil {
// No error is ok
} else if ok, _ := controller.IsRequeueKey(err); !ok { // Requeue is also fine.
t.Errorf("Expected to cancel TaskRun successfully!")
}

Expand Down Expand Up @@ -6070,6 +6072,8 @@ func (prt PipelineRunTest) reconcileRun(namespace, pipelineRunName string, wantE
if controller.IsPermanentError(reconcileError) != permanentError {
prt.Test.Fatalf("Expected the error to be permanent: %v but got: %v", permanentError, controller.IsPermanentError(reconcileError))
}
} else if ok, _ := controller.IsRequeueKey(reconcileError); ok {
// This is normal, it happens for timeouts when we otherwise successfully reconcile.
} else if reconcileError != nil {
prt.Test.Fatalf("Error reconciling: %s", reconcileError)
}
Expand Down
10 changes: 0 additions & 10 deletions pkg/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package taskrun

import (
"context"
"time"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
Expand All @@ -32,13 +31,11 @@ import (
"github.com/tektoncd/pipeline/pkg/pod"
cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
filteredpodinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/kmeta"
"knative.dev/pkg/logging"
)

Expand Down Expand Up @@ -86,13 +83,6 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex
}
})

c.snooze = func(acc kmeta.Accessor, amnt time.Duration) {
impl.EnqueueKeyAfter(types.NamespacedName{
Namespace: acc.GetNamespace(),
Name: acc.GetName(),
}, amnt)
}

logger.Info("Setting up event handlers")
taskRunInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))

Expand Down
23 changes: 11 additions & 12 deletions pkg/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ type Reconciler struct {
entrypointCache podconvert.EntrypointCache
metrics *Recorder
pvcHandler volumeclaim.PvcHandler

snooze func(kmeta.Accessor, time.Duration)
}

// Check that our Reconciler implements taskrunreconciler.Interface
Expand Down Expand Up @@ -163,15 +161,6 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkg
err := c.failTaskRun(ctx, tr, v1beta1.TaskRunReasonTimedOut, message)
return c.finishReconcileUpdateEmitEvents(ctx, tr, before, err)
}
defer func() {
if tr.Status.StartTime == nil {
return
}
// Compute the time since the task started.
elapsed := time.Since(tr.Status.StartTime.Time)
// Snooze this resource until the timeout has elapsed.
c.snooze(tr, tr.GetTimeout(ctx)-elapsed)
}()

// prepare fetches all required resources, validates them together with the
// taskrun, runs API convertions. Errors that come out of prepare are
Expand All @@ -194,7 +183,17 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkg
}

// Emit events (only when ConditionSucceeded was changed)
return c.finishReconcileUpdateEmitEvents(ctx, tr, before, err)
if err = c.finishReconcileUpdateEmitEvents(ctx, tr, before, err); err != nil {
return err
}

if tr.Status.StartTime != nil {
// Compute the time since the task started.
elapsed := time.Since(tr.Status.StartTime.Time)
// Snooze this resource until the timeout has elapsed.
return controller.NewRequeueAfter(tr.GetTimeout(ctx) - elapsed)
}
return nil
}
func (c *Reconciler) stopSidecars(ctx context.Context, tr *v1beta1.TaskRun) (*corev1.Pod, error) {
logger := logging.FromContext(ctx)
Expand Down
76 changes: 48 additions & 28 deletions pkg/reconciler/taskrun/taskrun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,9 @@ func TestReconcile_ExplicitDefaultSA(t *testing.T) {
c := testAssets.Controller
clients := testAssets.Clients

if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tc.taskRun)); err != nil {
if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tc.taskRun)); err == nil {
t.Error("Wanted a wrapped requeue error, but got nil.")
} else if ok, _ := controller.IsRequeueKey(err); !ok {
t.Errorf("expected no error. Got error %v", err)
}
if len(clients.Kube.Actions()) == 0 {
Expand Down Expand Up @@ -726,7 +728,9 @@ func TestReconcile_FeatureFlags(t *testing.T) {
}, metav1.CreateOptions{}); err != nil {
t.Fatal(err)
}
if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tc.taskRun)); err != nil {
if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tc.taskRun)); err == nil {
t.Error("Wanted a wrapped requeue error, but got nil.")
} else if ok, _ := controller.IsRequeueKey(err); !ok {
t.Errorf("expected no error. Got error %v", err)
}
if len(clients.Kube.Actions()) == 0 {
Expand Down Expand Up @@ -811,7 +815,9 @@ func TestReconcile_CloudEvents(t *testing.T) {
t.Fatal(err)
}

if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(taskRun)); err != nil {
if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(taskRun)); err == nil {
t.Error("Wanted a wrapped requeue error, but got nil.")
} else if ok, _ := controller.IsRequeueKey(err); !ok {
t.Errorf("expected no error. Got error %v", err)
}
if len(clients.Kube.Actions()) == 0 {
Expand Down Expand Up @@ -1550,7 +1556,9 @@ func TestReconcile(t *testing.T) {
t.Fatal(err)
}

if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tc.taskRun)); err != nil {
if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tc.taskRun)); err == nil {
t.Error("Wanted a wrapped requeue error, but got nil.")
} else if ok, _ := controller.IsRequeueKey(err); !ok {
t.Errorf("expected no error. Got error %v", err)
}
if len(clients.Kube.Actions()) == 0 {
Expand Down Expand Up @@ -1620,7 +1628,9 @@ func TestReconcile_SetsStartTime(t *testing.T) {
t.Fatal(err)
}

if err := testAssets.Controller.Reconciler.Reconcile(context.Background(), getRunName(taskRun)); err != nil {
if err := testAssets.Controller.Reconciler.Reconcile(context.Background(), getRunName(taskRun)); err == nil {
t.Error("Wanted a wrapped requeue error, but got nil.")
} else if ok, _ := controller.IsRequeueKey(err); !ok {
t.Errorf("expected no error reconciling valid TaskRun but got %v", err)
}

Expand Down Expand Up @@ -1774,7 +1784,7 @@ func TestReconcileTaskRunWithPermanentError(t *testing.T) {
// Such TaskRun enters Reconciler and from within the isDone block, marks the run success so that
// reconciler does not keep trying to reconcile
if reconcileErr != nil {
t.Fatalf("Expected to see no error when reconciling TaskRun with Permanent Error but was not none")
t.Fatalf("Expected to see error when reconciling TaskRun with Permanent Error but was not none")
}

// Check actions
Expand Down Expand Up @@ -1877,7 +1887,9 @@ func TestReconcilePodUpdateStatus(t *testing.T) {
c := testAssets.Controller
clients := testAssets.Clients

if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(taskRun)); err != nil {
if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(taskRun)); err == nil {
t.Error("Wanted a wrapped requeue error, but got nil.")
} else if ok, _ := controller.IsRequeueKey(err); !ok {
t.Fatalf("Unexpected error when Reconcile() : %v", err)
}
newTr, err := clients.Pipeline.TektonV1beta1().TaskRuns(taskRun.Namespace).Get(testAssets.Ctx, taskRun.Name, metav1.GetOptions{})
Expand Down Expand Up @@ -1913,7 +1925,9 @@ func TestReconcilePodUpdateStatus(t *testing.T) {
// lister cache is update to reflect the result of the previous Reconcile.
testAssets.Informers.TaskRun.Informer().GetIndexer().Add(newTr)

if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(taskRun)); err != nil {
if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(taskRun)); err == nil {
t.Error("Wanted a wrapped requeue error, but got nil.")
} else if ok, _ := controller.IsRequeueKey(err); !ok {
t.Fatalf("Unexpected error when Reconcile(): %v", err)
}

Expand Down Expand Up @@ -2178,13 +2192,10 @@ func TestHandlePodCreationError(t *testing.T) {
taskLister: testAssets.Informers.Task.Lister(),
clusterTaskLister: testAssets.Informers.ClusterTask.Lister(),
resourceLister: testAssets.Informers.PipelineResource.Lister(),
snooze: func(acc kmeta.Accessor, amnt time.Duration) {
t.Error("Unexpected call to snooze.")
},
cloudEventClient: testAssets.Clients.CloudEvents,
metrics: nil, // Not used
entrypointCache: nil, // Not used
pvcHandler: volumeclaim.NewPVCHandler(testAssets.Clients.Kube, testAssets.Logger),
cloudEventClient: testAssets.Clients.CloudEvents,
metrics: nil, // Not used
entrypointCache: nil, // Not used
pvcHandler: volumeclaim.NewPVCHandler(testAssets.Clients.Kube, testAssets.Logger),
}

testcases := []struct {
Expand Down Expand Up @@ -2393,7 +2404,9 @@ func TestReconcileCloudEvents(t *testing.T) {
t.Fatal(err)
}

if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tc.taskRun)); err != nil {
if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tc.taskRun)); err == nil {
// No error is ok.
} else if ok, _ := controller.IsRequeueKey(err); !ok {
t.Errorf("expected no error. Got error %v", err)
}

Expand Down Expand Up @@ -2611,7 +2624,9 @@ func TestReconcileValidDefaultWorkspace(t *testing.T) {
t.Fatal(err)
}

if err := testAssets.Controller.Reconciler.Reconcile(testAssets.Ctx, getRunName(taskRun)); err != nil {
if err := testAssets.Controller.Reconciler.Reconcile(testAssets.Ctx, getRunName(taskRun)); err == nil {
// No error is ok.
} else if ok, _ := controller.IsRequeueKey(err); !ok {
t.Errorf("Expected no error reconciling valid TaskRun but got %v", err)
}

Expand Down Expand Up @@ -2734,7 +2749,9 @@ func TestReconcileValidDefaultWorkspaceOmittedOptionalWorkspace(t *testing.T) {
t.Fatal(err)
}

if err := testAssets.Controller.Reconciler.Reconcile(context.Background(), getRunName(taskRunOmittingWorkspace)); err != nil {
if err := testAssets.Controller.Reconciler.Reconcile(context.Background(), getRunName(taskRunOmittingWorkspace)); err == nil {
t.Error("Wanted a wrapped requeue error, but got nil.")
} else if ok, _ := controller.IsRequeueKey(err); !ok {
t.Errorf("Unexpected reconcile error for TaskRun %q: %v", taskRunOmittingWorkspace.Name, err)
}

Expand Down Expand Up @@ -2963,7 +2980,9 @@ func TestReconcileWorkspaceWithVolumeClaimTemplate(t *testing.T) {
t.Fatal(err)
}

if err := testAssets.Controller.Reconciler.Reconcile(context.Background(), getRunName(taskRun)); err != nil {
if err := testAssets.Controller.Reconciler.Reconcile(context.Background(), getRunName(taskRun)); err == nil {
t.Error("Wanted a wrapped requeue error, but got nil.")
} else if ok, _ := controller.IsRequeueKey(err); !ok {
t.Errorf("expected no error reconciling valid TaskRun but got %v", err)
}

Expand Down Expand Up @@ -3231,13 +3250,10 @@ func TestFailTaskRun(t *testing.T) {
taskLister: testAssets.Informers.Task.Lister(),
clusterTaskLister: testAssets.Informers.ClusterTask.Lister(),
resourceLister: testAssets.Informers.PipelineResource.Lister(),
snooze: func(acc kmeta.Accessor, amnt time.Duration) {
t.Error("Unexpected call to snooze.")
},
cloudEventClient: testAssets.Clients.CloudEvents,
metrics: nil, // Not used
entrypointCache: nil, // Not used
pvcHandler: volumeclaim.NewPVCHandler(testAssets.Clients.Kube, testAssets.Logger),
cloudEventClient: testAssets.Clients.CloudEvents,
metrics: nil, // Not used
entrypointCache: nil, // Not used
pvcHandler: volumeclaim.NewPVCHandler(testAssets.Clients.Kube, testAssets.Logger),
}

err := c.failTaskRun(testAssets.Ctx, tc.taskRun, tc.reason, tc.message)
Expand Down Expand Up @@ -3378,7 +3394,9 @@ func TestPodAdoption(t *testing.T) {
}

// Reconcile the TaskRun. This creates a Pod.
if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tr)); err != nil {
if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tr)); err == nil {
t.Error("Wanted a wrapped requeue error, but got nil.")
} else if ok, _ := controller.IsRequeueKey(err); !ok {
t.Errorf("Error reconciling TaskRun. Got error %v", err)
}

Expand Down Expand Up @@ -3411,7 +3429,9 @@ func TestPodAdoption(t *testing.T) {
}

// Reconcile the TaskRun again.
if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tr)); err != nil {
if err := c.Reconciler.Reconcile(testAssets.Ctx, getRunName(tr)); err == nil {
t.Error("Wanted a wrapped requeue error, but got nil.")
} else if ok, _ := controller.IsRequeueKey(err); !ok {
t.Errorf("Error reconciling TaskRun again. Got error %v", err)
}

Expand Down