diff --git a/pkg/apis/pipeline/v1beta1/pipelinerun_types.go b/pkg/apis/pipeline/v1beta1/pipelinerun_types.go index f8928f80c05..fb26337595c 100644 --- a/pkg/apis/pipeline/v1beta1/pipelinerun_types.go +++ b/pkg/apis/pipeline/v1beta1/pipelinerun_types.go @@ -17,7 +17,6 @@ limitations under the License. package v1beta1 import ( - "fmt" "time" "github.com/tektoncd/pipeline/pkg/apis/config" @@ -25,6 +24,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "knative.dev/pkg/apis" duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1" ) @@ -98,10 +98,9 @@ func (pr *PipelineRun) IsCancelled() bool { return pr.Spec.Status == PipelineRunSpecStatusCancelled } -// GetRunKey return the pipelinerun key for timeout handler map -func (pr *PipelineRun) GetRunKey() string { - // The address of the pointer is a threadsafe unique identifier for the pipelinerun - return fmt.Sprintf("%s/%p", pipeline.PipelineRunControllerName, pr) +// GetNamespacedName returns a k8s namespaced name that identifies this PipelineRun +func (pr *PipelineRun) GetNamespacedName() types.NamespacedName { + return types.NamespacedName{Namespace: pr.Namespace, Name: pr.Name} } // IsTimedOut returns true if a pipelinerun has exceeded its spec.Timeout based on its status.Timeout @@ -231,7 +230,7 @@ const ( PipelineRunReasonCancelled PipelineRunReason = "Cancelled" // PipelineRunReasonTimedOut is the reason set when the PipelineRun has timed out PipelineRunReasonTimedOut PipelineRunReason = "PipelineRunTimeout" - // ReasonStopping indicates that no new Tasks will be scheduled by the controller, and the + // PipelineRunReasonStopping indicates that no new Tasks will be scheduled by the controller, and the // pipeline will stop once all running tasks complete their work PipelineRunReasonStopping PipelineRunReason = "PipelineRunStopping" ) diff --git a/pkg/apis/pipeline/v1beta1/pipelinerun_types_test.go b/pkg/apis/pipeline/v1beta1/pipelinerun_types_test.go index 8388361bd22..ec2a5eb0cd9 100644 --- a/pkg/apis/pipeline/v1beta1/pipelinerun_types_test.go +++ b/pkg/apis/pipeline/v1beta1/pipelinerun_types_test.go @@ -17,7 +17,6 @@ limitations under the License. package v1beta1_test import ( - "fmt" "testing" "time" @@ -167,11 +166,12 @@ func TestPipelineRunHasVolumeClaimTemplate(t *testing.T) { } } -func TestPipelineRunKey(t *testing.T) { - pr := tb.PipelineRun("prunname") - expectedKey := fmt.Sprintf("PipelineRun/%p", pr) - if pr.GetRunKey() != expectedKey { - t.Fatalf("Expected taskrun key to be %s but got %s", expectedKey, pr.GetRunKey()) +func TestGetNamespacedName(t *testing.T) { + pr := tb.PipelineRun("prunname", tb.PipelineRunNamespace("foo")) + n := pr.GetNamespacedName() + expected := "foo/prunname" + if n.String() != expected { + t.Fatalf("Expected name to be %s but got %s", expected, n.String()) } } diff --git a/pkg/apis/pipeline/v1beta1/taskrun_types.go b/pkg/apis/pipeline/v1beta1/taskrun_types.go index a314760bcef..e91f70aceec 100644 --- a/pkg/apis/pipeline/v1beta1/taskrun_types.go +++ b/pkg/apis/pipeline/v1beta1/taskrun_types.go @@ -25,6 +25,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "knative.dev/pkg/apis" duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1" ) @@ -411,10 +412,9 @@ func (tr *TaskRun) GetTimeout() time.Duration { return tr.Spec.Timeout.Duration } -// GetRunKey return the taskrun key for timeout handler map -func (tr *TaskRun) GetRunKey() string { - // The address of the pointer is a threadsafe unique identifier for the taskrun - return fmt.Sprintf("%s/%p", pipeline.TaskRunControllerName, tr) +// GetNamespacedName returns a k8s namespaced name that identifies this TaskRun +func (tr *TaskRun) GetNamespacedName() types.NamespacedName { + return types.NamespacedName{Namespace: tr.Namespace, Name: tr.Name} } // IsPartOfPipeline return true if TaskRun is a part of a Pipeline. diff --git a/pkg/apis/pipeline/v1beta1/taskrun_types_test.go b/pkg/apis/pipeline/v1beta1/taskrun_types_test.go index 7b8195b9328..59fd607378e 100644 --- a/pkg/apis/pipeline/v1beta1/taskrun_types_test.go +++ b/pkg/apis/pipeline/v1beta1/taskrun_types_test.go @@ -17,11 +17,11 @@ limitations under the License. package v1beta1_test import ( - "fmt" "testing" "time" "github.com/google/go-cmp/cmp" + tb "github.com/tektoncd/pipeline/internal/builder/v1beta1" "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" "github.com/tektoncd/pipeline/test/diff" @@ -172,14 +172,11 @@ func TestTaskRunHasVolumeClaimTemplate(t *testing.T) { } func TestTaskRunKey(t *testing.T) { - tr := &v1beta1.TaskRun{ - ObjectMeta: metav1.ObjectMeta{ - Name: "taskrunname", - }, - } - expectedKey := fmt.Sprintf("TaskRun/%p", tr) - if tr.GetRunKey() != expectedKey { - t.Fatalf("Expected taskrun key to be %s but got %s", expectedKey, tr.GetRunKey()) + tr := tb.TaskRun("trunname", tb.TaskRunNamespace("foo")) + n := tr.GetNamespacedName() + expected := "foo/trunname" + if n.String() != expected { + t.Fatalf("Expected name to be %s but got %s", expected, n.String()) } } diff --git a/pkg/reconciler/pipelinerun/controller.go b/pkg/reconciler/pipelinerun/controller.go index af9b2b44bed..3f6f695169c 100644 --- a/pkg/reconciler/pipelinerun/controller.go +++ b/pkg/reconciler/pipelinerun/controller.go @@ -86,8 +86,8 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex } }) - timeoutHandler.SetPipelineRunCallbackFunc(impl.Enqueue) - timeoutHandler.CheckTimeouts(namespace, kubeclientset, pipelineclientset) + timeoutHandler.SetCallbackFunc(impl.EnqueueKey) + timeoutHandler.CheckTimeouts(ctx, namespace, kubeclientset, pipelineclientset) logger.Info("Setting up event handlers") pipelineRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index ac1b5772730..06e35c89df6 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -140,11 +140,11 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun) pr.Status.InitializeConditions() // In case node time was not synchronized, when controller has been scheduled to other nodes. if pr.Status.StartTime.Sub(pr.CreationTimestamp.Time) < 0 { - logger.Warnf("PipelineRun %s createTimestamp %s is after the pipelineRun started %s", pr.GetRunKey(), pr.CreationTimestamp, pr.Status.StartTime) + logger.Warnf("PipelineRun %s createTimestamp %s is after the pipelineRun started %s", pr.GetNamespacedName().String(), pr.CreationTimestamp, pr.Status.StartTime) pr.Status.StartTime = &pr.CreationTimestamp } // start goroutine to track pipelinerun timeout only startTime is not set - go c.timeoutHandler.WaitPipelineRun(pr, pr.Status.StartTime) + go c.timeoutHandler.Wait(pr.GetNamespacedName(), *pr.Status.StartTime, *pr.Spec.Timeout) // Emit events. During the first reconcile the status of the PipelineRun may change twice // from not Started to Started and then to Running, so we need to sent the event here // and at the end of 'Reconcile' again. @@ -171,7 +171,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun) logger.Errorf("Failed to delete StatefulSet for PipelineRun %s: %v", pr.Name, err) return c.finishReconcileUpdateEmitEvents(ctx, pr, before, err) } - c.timeoutHandler.Release(pr) + c.timeoutHandler.Release(pr.GetNamespacedName()) if err := c.updateTaskRunsStatusDirectly(pr); err != nil { logger.Errorf("Failed to update TaskRun status for PipelineRun %s: %v", pr.Name, err) return c.finishReconcileUpdateEmitEvents(ctx, pr, before, err) diff --git a/pkg/reconciler/taskrun/controller.go b/pkg/reconciler/taskrun/controller.go index 1e728061dfb..162053b52d7 100644 --- a/pkg/reconciler/taskrun/controller.go +++ b/pkg/reconciler/taskrun/controller.go @@ -87,8 +87,8 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex } }) - timeoutHandler.SetTaskRunCallbackFunc(impl.Enqueue) - timeoutHandler.CheckTimeouts(namespace, kubeclientset, pipelineclientset) + timeoutHandler.SetCallbackFunc(impl.EnqueueKey) + timeoutHandler.CheckTimeouts(ctx, namespace, kubeclientset, pipelineclientset) logger.Info("Setting up event handlers") taskRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ diff --git a/pkg/reconciler/taskrun/taskrun.go b/pkg/reconciler/taskrun/taskrun.go index 5b919cc09b5..df9421cc689 100644 --- a/pkg/reconciler/taskrun/taskrun.go +++ b/pkg/reconciler/taskrun/taskrun.go @@ -93,7 +93,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkg tr.Status.InitializeConditions() // In case node time was not synchronized, when controller has been scheduled to other nodes. if tr.Status.StartTime.Sub(tr.CreationTimestamp.Time) < 0 { - logger.Warnf("TaskRun %s createTimestamp %s is after the taskRun started %s", tr.GetRunKey(), tr.CreationTimestamp, tr.Status.StartTime) + logger.Warnf("TaskRun %s createTimestamp %s is after the taskRun started %s", tr.GetNamespacedName().String(), tr.CreationTimestamp, tr.Status.StartTime) tr.Status.StartTime = &tr.CreationTimestamp } // Emit events. During the first reconcile the status of the TaskRun may change twice @@ -120,7 +120,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkg // send cloud events. So we stop here an return errors encountered this far. return merr.ErrorOrNil() } - c.timeoutHandler.Release(tr) + c.timeoutHandler.Release(tr.GetNamespacedName()) pod, err := c.KubeClientSet.CoreV1().Pods(tr.Namespace).Get(tr.Status.PodName, metav1.GetOptions{}) if err == nil { err = podconvert.StopSidecars(c.Images.NopImage, c.KubeClientSet, *pod) @@ -379,7 +379,7 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1beta1.TaskRun, logger.Error("Failed to create task run pod for task %q: %v", tr.Name, newErr) return newErr } - go c.timeoutHandler.WaitTaskRun(tr, tr.Status.StartTime) + go c.timeoutHandler.Wait(tr.GetNamespacedName(), *tr.Status.StartTime, *tr.Spec.Timeout) } if err := c.tracker.Track(tr.GetBuildPodRef(), tr); err != nil { logger.Errorf("Failed to create tracker for build pod %q for taskrun %q: %v", tr.Name, tr.Name, err) @@ -460,9 +460,9 @@ func (c *Reconciler) updateLabelsAndAnnotations(tr *v1beta1.TaskRun) (*v1beta1.T func (c *Reconciler) handlePodCreationError(ctx context.Context, tr *v1beta1.TaskRun, err error) error { var msg string if isExceededResourceQuotaError(err) { - backoff, currentlyBackingOff := c.timeoutHandler.GetBackoff(tr) + backoff, currentlyBackingOff := c.timeoutHandler.GetBackoff(tr.GetNamespacedName(), *tr.Status.StartTime, *tr.Spec.Timeout) if !currentlyBackingOff { - go c.timeoutHandler.SetTaskRunTimer(tr, time.Until(backoff.NextAttempt)) + go c.timeoutHandler.SetTimer(tr.GetNamespacedName(), time.Until(backoff.NextAttempt)) } msg = fmt.Sprintf("TaskRun Pod exceeded available resources, reattempted %d times", backoff.NumAttempts) tr.Status.SetCondition(&apis.Condition{ diff --git a/pkg/reconciler/taskrun/taskrun_test.go b/pkg/reconciler/taskrun/taskrun_test.go index 964bb449b23..149392ec374 100644 --- a/pkg/reconciler/taskrun/taskrun_test.go +++ b/pkg/reconciler/taskrun/taskrun_test.go @@ -2075,7 +2075,7 @@ func TestHandlePodCreationError(t *testing.T) { } // Prevent backoff timer from starting - c.timeoutHandler.SetTaskRunCallbackFunc(nil) + c.timeoutHandler.SetCallbackFunc(nil) testcases := []struct { description string diff --git a/pkg/timeout/handler.go b/pkg/timeout/handler.go index 0cf74446939..6c8c0179d0c 100644 --- a/pkg/timeout/handler.go +++ b/pkg/timeout/handler.go @@ -17,17 +17,18 @@ limitations under the License. package timeout import ( + "context" "math" "math/rand" "sync" "time" - "github.com/tektoncd/pipeline/pkg/apis/config" - "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" clientset "github.com/tektoncd/pipeline/pkg/client/clientset/versioned" + "github.com/tektoncd/pipeline/pkg/contexts" "go.uber.org/zap" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" ) @@ -36,7 +37,6 @@ const ( ) var ( - defaultFunc = func(i interface{}) {} maxBackoffExponent = math.Ceil(math.Log2(maxBackoffSeconds)) ) @@ -63,12 +63,9 @@ type jitterFunc func(numSeconds int) (jitteredSeconds int) type Handler struct { logger *zap.SugaredLogger - // taskRunCallbackFunc is the function to call when a TaskRun has timed out. - // This is usually set to the function that enqueues the taskRun for reconciling. - taskRunCallbackFunc func(interface{}) - // pipelineRunCallbackFunc is the function to call when a TaskRun has timed out - // This is usually set to the function that enqueues the taskRun for reconciling. - pipelineRunCallbackFunc func(interface{}) + // callbackFunc is the function to call when a run has timed out. + // This is usually set to the function that enqueues the namespaced name for reconciling. + callbackFunc func(types.NamespacedName) // stopCh is used to signal to all goroutines that they should stop, e.g. because // the reconciler is stopping stopCh <-chan struct{} @@ -96,71 +93,64 @@ func NewHandler( } } -// SetTaskRunCallbackFunc sets the callback function when timeout occurs for taskrun objects -func (t *Handler) SetTaskRunCallbackFunc(f func(interface{})) { - t.taskRunCallbackFunc = f +// SetCallbackFunc sets the callback function when timeout occurs +func (t *Handler) SetCallbackFunc(f func(types.NamespacedName)) { + t.callbackFunc = f } -// SetPipelineRunCallbackFunc sets the callback function when timeout occurs for pipelinerun objects -func (t *Handler) SetPipelineRunCallbackFunc(f func(interface{})) { - t.pipelineRunCallbackFunc = f -} - -// Release deletes channels and data that are specific to a StatusKey object. -func (t *Handler) Release(runObj StatusKey) { - key := runObj.GetRunKey() +// Release deletes channels and data that are specific to a namespacedName +func (t *Handler) Release(n types.NamespacedName) { t.doneMut.Lock() defer t.doneMut.Unlock() t.backoffsMut.Lock() defer t.backoffsMut.Unlock() - if done, ok := t.done[key]; ok { - delete(t.done, key) + if done, ok := t.done[n.String()]; ok { + delete(t.done, n.String()) close(done) } - delete(t.backoffs, key) + delete(t.backoffs, n.String()) } -func (t *Handler) getOrCreateDoneChan(runObj StatusKey) chan bool { - key := runObj.GetRunKey() +func (t *Handler) getOrCreateDoneChan(n types.NamespacedName) chan bool { t.doneMut.Lock() defer t.doneMut.Unlock() var done chan bool var ok bool - if done, ok = t.done[key]; !ok { + if done, ok = t.done[n.String()]; !ok { done = make(chan bool) } - t.done[key] = done + t.done[n.String()] = done return done } -// GetBackoff records the number of times it has seen a TaskRun and calculates an -// appropriate backoff deadline based on that count. Only one backoff per TaskRun +// GetBackoff records the number of times it has seen n and calculates an +// appropriate backoff deadline based on that count. Only one backoff per n // may be active at any moment. Requests for a new backoff in the face of an // existing one will be ignored and details of the existing backoff will be returned -// instead. Further, if a calculated backoff time is after the timeout of the TaskRun +// instead. Further, if a calculated backoff time is after the timeout of the runKey // then the time of the timeout will be returned instead. // // Returned values are a backoff struct containing a NumAttempts field with the -// number of attempts performed for this TaskRun and a NextAttempt field +// number of attempts performed for this n and a NextAttempt field // describing the time at which the next attempt should be performed. -// Additionally a boolean is returned indicating whether a backoff for the -// TaskRun is already in progress. -func (t *Handler) GetBackoff(tr *v1beta1.TaskRun) (Backoff, bool) { +// Additionally a boolean is returned indicating whether a backoff for n +// is already in progress. +func (t *Handler) GetBackoff(n types.NamespacedName, startTime metav1.Time, timeout metav1.Duration) (Backoff, bool) { t.backoffsMut.Lock() defer t.backoffsMut.Unlock() - b := t.backoffs[tr.GetRunKey()] + b := t.backoffs[n.String()] if time.Now().Before(b.NextAttempt) { return b, true } b.NumAttempts++ b.NextAttempt = time.Now().Add(backoffDuration(b.NumAttempts, rand.Intn)) - timeoutDeadline := tr.Status.StartTime.Time.Add(tr.Spec.Timeout.Duration) + timeoutDeadline := startTime.Time.Add(timeout.Duration) if timeoutDeadline.Before(b.NextAttempt) { b.NextAttempt = timeoutDeadline } - t.backoffs[tr.GetRunKey()] = b + t.backoffs[n.String()] = b return b, false } @@ -179,7 +169,7 @@ func backoffDuration(count uint, jf jitterFunc) time.Duration { // checkPipelineRunTimeouts function creates goroutines to wait for pipelinerun to // finish/timeout in a given namespace -func (t *Handler) checkPipelineRunTimeouts(namespace string, pipelineclientset clientset.Interface) { +func (t *Handler) checkPipelineRunTimeouts(ctx context.Context, namespace string, pipelineclientset clientset.Interface) { pipelineRuns, err := pipelineclientset.TektonV1beta1().PipelineRuns(namespace).List(metav1.ListOptions{}) if err != nil { t.logger.Errorf("Can't get pipelinerun list in namespace %s: %s", namespace, err) @@ -187,18 +177,19 @@ func (t *Handler) checkPipelineRunTimeouts(namespace string, pipelineclientset c } for _, pipelineRun := range pipelineRuns.Items { pipelineRun := pipelineRun + pipelineRun.SetDefaults(contexts.WithUpgradeViaDefaulting(ctx)) if pipelineRun.IsDone() || pipelineRun.IsCancelled() { continue } if pipelineRun.HasStarted() { - go t.WaitPipelineRun(&pipelineRun, pipelineRun.Status.StartTime) + go t.Wait(pipelineRun.GetNamespacedName(), *pipelineRun.Status.StartTime, *pipelineRun.Spec.Timeout) } } } // CheckTimeouts function iterates through a given namespace or all namespaces // (if empty string) and calls corresponding taskrun/pipelinerun timeout functions -func (t *Handler) CheckTimeouts(namespace string, kubeclientset kubernetes.Interface, pipelineclientset clientset.Interface) { +func (t *Handler) CheckTimeouts(ctx context.Context, namespace string, kubeclientset kubernetes.Interface, pipelineclientset clientset.Interface) { // scoped namespace namespaceNames := []string{namespace} // all namespaces @@ -215,14 +206,14 @@ func (t *Handler) CheckTimeouts(namespace string, kubeclientset kubernetes.Inter } for _, namespace := range namespaceNames { - t.checkTaskRunTimeouts(namespace, pipelineclientset) - t.checkPipelineRunTimeouts(namespace, pipelineclientset) + t.checkTaskRunTimeouts(ctx, namespace, pipelineclientset) + t.checkPipelineRunTimeouts(ctx, namespace, pipelineclientset) } } // checkTaskRunTimeouts function creates goroutines to wait for pipelinerun to // finish/timeout in a given namespace -func (t *Handler) checkTaskRunTimeouts(namespace string, pipelineclientset clientset.Interface) { +func (t *Handler) checkTaskRunTimeouts(ctx context.Context, namespace string, pipelineclientset clientset.Interface) { taskruns, err := pipelineclientset.TektonV1beta1().TaskRuns(namespace).List(metav1.ListOptions{}) if err != nil { t.logger.Errorf("Can't get taskrun list in namespace %s: %s", namespace, err) @@ -230,83 +221,57 @@ func (t *Handler) checkTaskRunTimeouts(namespace string, pipelineclientset clien } for _, taskrun := range taskruns.Items { taskrun := taskrun + taskrun.SetDefaults(contexts.WithUpgradeViaDefaulting(ctx)) if taskrun.IsDone() || taskrun.IsCancelled() { continue } if taskrun.HasStarted() { - go t.WaitTaskRun(&taskrun, taskrun.Status.StartTime) + go t.Wait(taskrun.GetNamespacedName(), *taskrun.Status.StartTime, *taskrun.Spec.Timeout) } } } -// WaitTaskRun function creates a blocking function for taskrun to wait for -// 1. Stop signal, 2. TaskRun to complete or 3. Taskrun to time out, which is -// determined by checking if the tr's timeout has occurred since the startTime -func (t *Handler) WaitTaskRun(tr *v1beta1.TaskRun, startTime *metav1.Time) { - var timeout time.Duration - if tr.Spec.Timeout == nil { - timeout = config.DefaultTimeoutMinutes * time.Minute - } else { - timeout = tr.Spec.Timeout.Duration - } - t.waitRun(tr, timeout, startTime, t.taskRunCallbackFunc) -} - -// WaitPipelineRun function creates a blocking function for pipelinerun to wait for -// 1. Stop signal, 2. pipelinerun to complete or 3. pipelinerun to time out which is -// determined by checking if the tr's timeout has occurred since the startTime -func (t *Handler) WaitPipelineRun(pr *v1beta1.PipelineRun, startTime *metav1.Time) { - var timeout time.Duration - if pr.Spec.Timeout == nil { - timeout = config.DefaultTimeoutMinutes * time.Minute - } else { - timeout = pr.Spec.Timeout.Duration - } - t.waitRun(pr, timeout, startTime, t.pipelineRunCallbackFunc) -} - -func (t *Handler) waitRun(runObj StatusKey, timeout time.Duration, startTime *metav1.Time, callback func(interface{})) { - if startTime == nil { - t.logger.Errorf("startTime must be specified in order for a timeout to be calculated accurately for %s", runObj.GetRunKey()) +// Wait creates a blocking function for n to wait for +// 1. Stop signal, 2. Completion 3. Or duration to elapse since startTime +// time out, which is determined by checking if the timeout has occurred since the startTime +func (t *Handler) Wait(n types.NamespacedName, startTime metav1.Time, timeout metav1.Duration) { + if t.callbackFunc == nil { + t.logger.Errorf("somehow the timeout handler was not initialized with a callback function") return } - if callback == nil { - callback = defaultFunc - } runtime := time.Since(startTime.Time) - t.logger.Infof("About to start timeout timer for %s. started at %s, timeout is %s, running for %s", runObj.GetRunKey(), startTime.Time, timeout, runtime) - defer t.Release(runObj) - t.setTimer(runObj, timeout-runtime, callback) + t.logger.Infof("About to start timeout timer for %s. started at %s, timeout is %s, running for %s", n.String(), startTime.Time, timeout, runtime) + defer t.Release(n) + t.setTimer(n, timeout.Duration-runtime, t.callbackFunc) } -// SetTaskRunTimer creates a blocking function for taskrun to wait for +// SetTimer creates a blocking function for n to wait for // 1. Stop signal, 2. TaskRun to complete or 3. a given Duration to elapse. // // Since the timer's duration is a parameter rather than being tied to -// the lifetime of the TaskRun no resources are released after the timer -// fires. It is the caller's responsibility to Release() the TaskRun when +// the lifetime of the run object no resources are released after the timer +// fires. It is the caller's responsibility to Release() the run when // work with it has completed. -func (t *Handler) SetTaskRunTimer(tr *v1beta1.TaskRun, d time.Duration) { - callback := t.taskRunCallbackFunc - if callback == nil { - t.logger.Errorf("attempted to set a timer for %q but no task run callback has been assigned", tr.Name) +func (t *Handler) SetTimer(n types.NamespacedName, d time.Duration) { + if t.callbackFunc == nil { + t.logger.Errorf("somehow the timeout handler was not initialized with a callback function") return } - t.setTimer(tr, d, callback) + t.setTimer(n, d, t.callbackFunc) } -func (t *Handler) setTimer(runObj StatusKey, timeout time.Duration, callback func(interface{})) { - done := t.getOrCreateDoneChan(runObj) +func (t *Handler) setTimer(n types.NamespacedName, timeout time.Duration, callback func(types.NamespacedName)) { + done := t.getOrCreateDoneChan(n) started := time.Now() select { case <-t.stopCh: - t.logger.Infof("stopping timer for %q", runObj.GetRunKey()) + t.logger.Infof("stopping timer for %q", n.String()) return case <-done: - t.logger.Infof("%q finished, stopping timer", runObj.GetRunKey()) + t.logger.Infof("%q finished, stopping timer", n.String()) return case <-time.After(timeout): - t.logger.Infof("timer for %q has activated after %s", runObj.GetRunKey(), time.Since(started).String()) - callback(runObj) + t.logger.Infof("timer for %q has activated after %s", n.String(), time.Since(started).String()) + callback(n) } } diff --git a/pkg/timeout/handler_test.go b/pkg/timeout/handler_test.go index d3ae120e08a..e0260c00960 100644 --- a/pkg/timeout/handler_test.go +++ b/pkg/timeout/handler_test.go @@ -17,6 +17,7 @@ limitations under the License. package timeout import ( + "context" "fmt" "sync" "testing" @@ -31,6 +32,7 @@ import ( "go.uber.org/zap/zaptest/observer" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "knative.dev/pkg/apis" ) @@ -104,13 +106,12 @@ func TestTaskRunCheckTimeouts(t *testing.T) { th := NewHandler(stopCh, zap.New(observer).Sugar()) gotCallback := sync.Map{} - f := func(tr interface{}) { - trNew := tr.(*v1beta1.TaskRun) - gotCallback.Store(trNew.Name, struct{}{}) + f := func(n types.NamespacedName) { + gotCallback.Store(n.Name, struct{}{}) } - th.SetTaskRunCallbackFunc(f) - th.CheckTimeouts(allNs, c.Kube, c.Pipeline) + th.SetCallbackFunc(f) + th.CheckTimeouts(context.Background(), testNs, c.Kube, c.Pipeline) for _, tc := range []struct { name string @@ -194,13 +195,15 @@ func TestTaskRunSingleNamespaceCheckTimeouts(t *testing.T) { th := NewHandler(stopCh, zap.New(observer).Sugar()) gotCallback := sync.Map{} - f := func(tr interface{}) { - trNew := tr.(*v1beta1.TaskRun) - gotCallback.Store(trNew.Name, struct{}{}) + f := func(n types.NamespacedName) { + gotCallback.Store(n.Name, struct{}{}) } - th.SetTaskRunCallbackFunc(f) - th.CheckTimeouts(testNs, c.Kube, c.Pipeline) + th.SetCallbackFunc(f) + // Note that since f843899a11d5d09b29fac750f72f4a7e4882f615 CheckTimeouts is always called + // with a namespace so there is no reason to maintain all namespaces functionality; + // however in #2905 we should remove CheckTimeouts completely + th.CheckTimeouts(context.Background(), "", c.Kube, c.Pipeline) for _, tc := range []struct { name string @@ -308,13 +311,12 @@ func TestPipelinRunCheckTimeouts(t *testing.T) { th := NewHandler(stopCh, zap.New(observer).Sugar()) gotCallback := sync.Map{} - f := func(pr interface{}) { - prNew := pr.(*v1beta1.PipelineRun) - gotCallback.Store(prNew.Name, struct{}{}) + f := func(n types.NamespacedName) { + gotCallback.Store(n.Name, struct{}{}) } - th.SetPipelineRunCallbackFunc(f) - th.CheckTimeouts(allNs, c.Kube, c.Pipeline) + th.SetCallbackFunc(f) + th.CheckTimeouts(context.Background(), allNs, c.Kube, c.Pipeline) for _, tc := range []struct { name string pr *v1beta1.PipelineRun @@ -393,7 +395,7 @@ func TestWithNoFunc(t *testing.T) { t.Fatal("Expected CheckTimeouts function not to panic") } }() - testHandler.CheckTimeouts(allNs, c.Kube, c.Pipeline) + testHandler.CheckTimeouts(context.Background(), allNs, c.Kube, c.Pipeline) } @@ -402,7 +404,7 @@ func TestWithNoFunc(t *testing.T) { func TestSetTaskRunTimer(t *testing.T) { taskRun := tb.TaskRun("test-taskrun-arbitrary-timer", tb.TaskRunNamespace(testNs), tb.TaskRunSpec( tb.TaskRunTaskRef(simpleTask.Name, tb.TaskRefAPIVersion("a1")), - tb.TaskRunTimeout(2*time.Second), + tb.TaskRunTimeout(50*time.Millisecond), ), tb.TaskRunStatus(tb.StatusCondition(apis.Condition{ Type: apis.ConditionSucceeded, Status: corev1.ConditionUnknown}), @@ -415,11 +417,11 @@ func TestSetTaskRunTimer(t *testing.T) { timerDuration := 50 * time.Millisecond timerFailDeadline := 100 * time.Millisecond doneCh := make(chan struct{}) - callback := func(_ interface{}) { + f := func(_ types.NamespacedName) { close(doneCh) } - testHandler.SetTaskRunCallbackFunc(callback) - go testHandler.SetTaskRunTimer(taskRun, timerDuration) + testHandler.SetCallbackFunc(f) + go testHandler.SetTimer(taskRun.GetNamespacedName(), timerDuration) select { case <-doneCh: // The task run timer executed before the failure deadline