diff --git a/cmd/controller/main.go b/cmd/controller/main.go index e0ed68da096..dbb168042c1 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -113,7 +113,7 @@ func main() { pipelineInformer := pipelineInformerFactory.Tekton().V1alpha1().Pipelines() pipelineRunInformer := pipelineInformerFactory.Tekton().V1alpha1().PipelineRuns() - timeoutHandler := reconciler.NewTimeoutHandler(kubeClient, pipelineClient, stopCh, logger) + timeoutHandler := reconciler.NewTimeoutHandler(stopCh, logger) trc := taskrun.NewController(opt, taskRunInformer, @@ -141,7 +141,7 @@ func main() { } timeoutHandler.SetTaskRunCallbackFunc(trc.Enqueue) timeoutHandler.SetPipelineRunCallbackFunc(prc.Enqueue) - timeoutHandler.CheckTimeouts() + timeoutHandler.CheckTimeouts(kubeClient, pipelineClient) // Watch the logging config map and dynamically update logging levels. configMapWatcher.Watch(logging.ConfigName, logging.UpdateLevelFromConfigMap(logger, atomicLevel, logging.ControllerLogKey)) diff --git a/pkg/reconciler/timeout_handler.go b/pkg/reconciler/timeout_handler.go index e8432d9ca74..e995c69a15d 100644 --- a/pkg/reconciler/timeout_handler.go +++ b/pkg/reconciler/timeout_handler.go @@ -1,6 +1,8 @@ package reconciler import ( + "math" + "math/rand" "sync" "time" @@ -12,12 +14,14 @@ import ( "k8s.io/client-go/kubernetes" ) -var ( - defaultFunc = func(i interface{}) {} +const ( + defaultTimeout = 10 * time.Minute + maxBackoffSeconds = 120 ) -const ( - defaultTimeout = 10 * time.Minute +var ( + defaultFunc = func(i interface{}) {} + maxBackoffExponent = math.Ceil(math.Log2(maxBackoffSeconds)) ) // StatusKey interface to be implemented by Taskrun Pipelinerun types @@ -25,32 +29,41 @@ type StatusKey interface { GetRunKey() string } +// backoff contains state of exponential backoff for a given StatusKey +type backoff struct { + // NumAttempts reflects the number of times a given StatusKey has been delayed + NumAttempts uint + // NextAttempt is the point in time at which this backoff expires + NextAttempt time.Time +} + +// jitterFunc is a func applied to a computed backoff duration to remove uniformity +// from its results. A jitterFunc receives the number of seconds calculated by a +// backoff algorithm and returns the "jittered" result. +type jitterFunc func(numSeconds int) (jitteredSeconds int) + // TimeoutSet contains required k8s interfaces to handle build timeouts type TimeoutSet struct { logger *zap.SugaredLogger - kubeclientset kubernetes.Interface - pipelineclientset clientset.Interface taskRunCallbackFunc func(interface{}) pipelineRunCallbackFunc func(interface{}) stopCh <-chan struct{} done map[string]chan bool doneMut sync.Mutex + backoffs map[string]backoff + backoffsMut sync.Mutex } // NewTimeoutHandler returns TimeoutSet filled structure func NewTimeoutHandler( - kubeclientset kubernetes.Interface, - pipelineclientset clientset.Interface, stopCh <-chan struct{}, logger *zap.SugaredLogger, ) *TimeoutSet { return &TimeoutSet{ - kubeclientset: kubeclientset, - pipelineclientset: pipelineclientset, - stopCh: stopCh, - done: make(map[string]chan bool), - doneMut: sync.Mutex{}, - logger: logger, + stopCh: stopCh, + done: make(map[string]chan bool), + backoffs: make(map[string]backoff), + logger: logger, } } @@ -64,16 +77,20 @@ func (t *TimeoutSet) SetPipelineRunCallbackFunc(f func(interface{})) { t.pipelineRunCallbackFunc = f } -// Release function deletes key from timeout map +// Release deletes channels and data that are specific to a StatusKey object. func (t *TimeoutSet) Release(runObj StatusKey) { key := runObj.GetRunKey() t.doneMut.Lock() defer t.doneMut.Unlock() + t.backoffsMut.Lock() + defer t.backoffsMut.Unlock() + if finished, ok := t.done[key]; ok { delete(t.done, key) close(finished) } + delete(t.backoffs, key) } func (t *TimeoutSet) getOrCreateFinishedChan(runObj StatusKey) chan bool { @@ -101,10 +118,52 @@ func GetTimeout(d *metav1.Duration) time.Duration { return timeout } +// GetBackoff records the number of times it has seen a TaskRun and calculates an +// appropriate backoff deadline based on that count. Only one backoff per TaskRun +// may be active at any moment. Requests for a new backoff in the face of an +// existing one will be ignored and details of the existing backoff will be returned +// instead. Further, if a calculated backoff time is after the timeout of the TaskRun +// then the time of the timeout will be returned instead. +// +// Returned values are a backoff struct containing a NumAttempts field with the +// number of attempts performed for this TaskRun and a NextAttempt field +// describing the time at which the next attempt should be performed. +// Additionally a boolean is returned indicating whether a backoff for the +// TaskRun is already in progress. +func (t *TimeoutSet) GetBackoff(tr *v1alpha1.TaskRun) (backoff, bool) { + t.backoffsMut.Lock() + defer t.backoffsMut.Unlock() + b := t.backoffs[tr.GetRunKey()] + if time.Now().Before(b.NextAttempt) { + return b, true + } + b.NumAttempts += 1 + b.NextAttempt = time.Now().Add(backoffDuration(b.NumAttempts, rand.Intn)) + timeoutDeadline := tr.Status.StartTime.Time.Add(GetTimeout(tr.Spec.Timeout)) + if timeoutDeadline.Before(b.NextAttempt) { + b.NextAttempt = timeoutDeadline + } + t.backoffs[tr.GetRunKey()] = b + return b, false +} + +func backoffDuration(count uint, jf jitterFunc) time.Duration { + exp := float64(count) + if exp > maxBackoffExponent { + exp = maxBackoffExponent + } + seconds := int(math.Exp2(exp)) + jittered := 1 + jf(seconds) + if jittered > maxBackoffSeconds { + jittered = maxBackoffSeconds + } + return time.Duration(jittered) * time.Second +} + // checkPipelineRunTimeouts function creates goroutines to wait for pipelinerun to // finish/timeout in a given namespace -func (t *TimeoutSet) checkPipelineRunTimeouts(namespace string) { - pipelineRuns, err := t.pipelineclientset.TektonV1alpha1().PipelineRuns(namespace).List(metav1.ListOptions{}) +func (t *TimeoutSet) checkPipelineRunTimeouts(namespace string, pipelineclientset clientset.Interface) { + pipelineRuns, err := pipelineclientset.TektonV1alpha1().PipelineRuns(namespace).List(metav1.ListOptions{}) if err != nil { t.logger.Errorf("Can't get pipelinerun list in namespace %s: %s", namespace, err) return @@ -122,22 +181,22 @@ func (t *TimeoutSet) checkPipelineRunTimeouts(namespace string) { // CheckTimeouts function iterates through all namespaces and calls corresponding // taskrun/pipelinerun timeout functions -func (t *TimeoutSet) CheckTimeouts() { - namespaces, err := t.kubeclientset.CoreV1().Namespaces().List(metav1.ListOptions{}) +func (t *TimeoutSet) CheckTimeouts(kubeclientset kubernetes.Interface, pipelineclientset clientset.Interface) { + namespaces, err := kubeclientset.CoreV1().Namespaces().List(metav1.ListOptions{}) if err != nil { t.logger.Errorf("Can't get namespaces list: %s", err) return } for _, namespace := range namespaces.Items { - t.checkTaskRunTimeouts(namespace.GetName()) - t.checkPipelineRunTimeouts(namespace.GetName()) + t.checkTaskRunTimeouts(namespace.GetName(), pipelineclientset) + t.checkPipelineRunTimeouts(namespace.GetName(), pipelineclientset) } } // checkTaskRunTimeouts function creates goroutines to wait for pipelinerun to // finish/timeout in a given namespace -func (t *TimeoutSet) checkTaskRunTimeouts(namespace string) { - taskruns, err := t.pipelineclientset.TektonV1alpha1().TaskRuns(namespace).List(metav1.ListOptions{}) +func (t *TimeoutSet) checkTaskRunTimeouts(namespace string, pipelineclientset clientset.Interface) { + taskruns, err := pipelineclientset.TektonV1alpha1().TaskRuns(namespace).List(metav1.ListOptions{}) if err != nil { t.logger.Errorf("Can't get taskrun list in namespace %s: %s", namespace, err) return @@ -172,26 +231,43 @@ func (t *TimeoutSet) waitRun(runObj StatusKey, timeout time.Duration, startTime t.logger.Errorf("startTime must be specified in order for a timeout to be calculated accurately for %s", runObj.GetRunKey()) return } + if callback == nil { + callback = defaultFunc + } runtime := time.Since(startTime.Time) - finished := t.getOrCreateFinishedChan(runObj) - + t.logger.Infof("About to start timeout timer for %s. started at %s, timeout is %s, running for %s", runObj.GetRunKey(), startTime.Time, timeout, runtime) defer t.Release(runObj) + t.setTimer(runObj, timeout-runtime, callback) +} - t.logger.Infof("About to start timeout timer for %s. started at %s, timeout is %s, running for %s", runObj.GetRunKey(), startTime.Time, timeout, runtime) +// SetTaskRunTimer creates a blocking function for taskrun to wait for +// 1. Stop signal, 2. TaskRun to complete or 3. a given Duration to elapse. +// +// Since the timer's duration is a parameter rather than being tied to +// the lifetime of the TaskRun no resources are released after the timer +// fires. It is the caller's responsibility to Release() the TaskRun when +// work with it has completed. +func (t *TimeoutSet) SetTaskRunTimer(tr *v1alpha1.TaskRun, d time.Duration) { + callback := t.taskRunCallbackFunc + if callback == nil { + t.logger.Errorf("attempted to set a timer for %q but no task run callback has been assigned", tr.Name) + return + } + t.setTimer(tr, d, callback) +} +func (t *TimeoutSet) setTimer(runObj StatusKey, timeout time.Duration, callback func(interface{})) { + finished := t.getOrCreateFinishedChan(runObj) + started := time.Now() select { case <-t.stopCh: - t.logger.Infof("Stopping timeout timer for %s", runObj.GetRunKey()) + t.logger.Infof("stopping timer for %q", runObj.GetRunKey()) return case <-finished: - t.logger.Infof("%s finished, stopping the timeout timer", runObj.GetRunKey()) + t.logger.Infof("%q finished, stopping timer", runObj.GetRunKey()) return - case <-time.After(timeout - runtime): - t.logger.Infof("Timeout timer for %s has timed out (started at %s, timeout is %s, running for %s", runObj.GetRunKey(), startTime, timeout, time.Since(startTime.Time)) - if callback != nil { - callback(runObj) - } else { - defaultFunc(runObj) - } + case <-time.After(timeout): + t.logger.Infof("timer for %q has activated after %s", runObj.GetRunKey(), time.Since(started).String()) + callback(runObj) } } diff --git a/pkg/reconciler/timeout_handler_test.go b/pkg/reconciler/timeout_handler_test.go index a061d6bbeb3..6ef78687f1c 100644 --- a/pkg/reconciler/timeout_handler_test.go +++ b/pkg/reconciler/timeout_handler_test.go @@ -69,7 +69,7 @@ func TestTaskRunCheckTimeouts(t *testing.T) { defer close(stopCh) c, _ := test.SeedTestData(t, d) observer, _ := observer.New(zap.InfoLevel) - th := NewTimeoutHandler(c.Kube, c.Pipeline, stopCh, zap.New(observer).Sugar()) + th := NewTimeoutHandler(stopCh, zap.New(observer).Sugar()) gotCallback := sync.Map{} f := func(tr interface{}) { trNew := tr.(*v1alpha1.TaskRun) @@ -77,7 +77,7 @@ func TestTaskRunCheckTimeouts(t *testing.T) { } th.SetTaskRunCallbackFunc(f) - th.CheckTimeouts() + th.CheckTimeouts(c.Kube, c.Pipeline) for _, tc := range []struct { name string @@ -172,7 +172,7 @@ func TestPipelinRunCheckTimeouts(t *testing.T) { c, _ := test.SeedTestData(t, d) stopCh := make(chan struct{}) observer, _ := observer.New(zap.InfoLevel) - th := NewTimeoutHandler(c.Kube, c.Pipeline, stopCh, zap.New(observer).Sugar()) + th := NewTimeoutHandler(stopCh, zap.New(observer).Sugar()) defer close(stopCh) gotCallback := sync.Map{} @@ -182,7 +182,7 @@ func TestPipelinRunCheckTimeouts(t *testing.T) { } th.SetPipelineRunCallbackFunc(f) - th.CheckTimeouts() + th.CheckTimeouts(c.Kube, c.Pipeline) for _, tc := range []struct { name string pr *v1alpha1.PipelineRun @@ -247,7 +247,7 @@ func TestWithNoFunc(t *testing.T) { stopCh := make(chan struct{}) c, _ := test.SeedTestData(t, d) observer, _ := observer.New(zap.InfoLevel) - testHandler := NewTimeoutHandler(c.Kube, c.Pipeline, stopCh, zap.New(observer).Sugar()) + testHandler := NewTimeoutHandler(stopCh, zap.New(observer).Sugar()) defer func() { // this delay will ensure there is no race condition between stopCh/ timeout channel getting triggered time.Sleep(10 * time.Millisecond) @@ -256,6 +256,98 @@ func TestWithNoFunc(t *testing.T) { t.Fatal("Expected CheckTimeouts function not to panic") } }() - testHandler.CheckTimeouts() + testHandler.CheckTimeouts(c.Kube, c.Pipeline) } + +// TestGetTimeout checks that the timeout calculated for a given taskrun falls +// back to the default taskrun timeout when none is provided. +func TestGetTimeout(t *testing.T) { + testCases := []struct { + description string + inputDuration *metav1.Duration + expectedDuration time.Duration + }{ + { + description: "returns same duration as input when input is not nil", + inputDuration: &metav1.Duration{Duration: 2 * time.Second}, + expectedDuration: 2 * time.Second, + }, + { + description: "returns an end time using the default timeout if a TaskRun's timeout is nil", + inputDuration: nil, + expectedDuration: defaultTimeout, + }, + } + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + receivedDuration := GetTimeout(tc.inputDuration) + if receivedDuration != tc.expectedDuration { + t.Errorf("expected %q received %q", tc.expectedDuration.String(), receivedDuration.String()) + } + }) + } +} + +// TestSetTaskRunTimer checks that the SetTaskRunTimer method correctly calls the TaskRun +// callback after a set amount of time. +func TestSetTaskRunTimer(t *testing.T) { + taskRun := tb.TaskRun("test-taskrun-arbitrary-timer", testNs, tb.TaskRunSpec( + tb.TaskRunTaskRef(simpleTask.Name, tb.TaskRefAPIVersion("a1")), + tb.TaskRunTimeout(2*time.Second), + ), tb.TaskRunStatus(tb.Condition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionUnknown}), + tb.TaskRunStartTime(time.Now().Add(-10*time.Second)), + )) + + stopCh := make(chan struct{}) + observer, _ := observer.New(zap.InfoLevel) + testHandler := NewTimeoutHandler(stopCh, zap.New(observer).Sugar()) + timerDuration := 50 * time.Millisecond + timerFailDeadline := 100 * time.Millisecond + doneCh := make(chan struct{}) + callback := func(_ interface{}) { + close(doneCh) + } + testHandler.SetTaskRunCallbackFunc(callback) + go testHandler.SetTaskRunTimer(taskRun, timerDuration) + select { + case <-doneCh: + // The task run timer executed before the failure deadline + case <-time.After(timerFailDeadline): + t.Errorf("timer did not execute task run callback func within expected time") + } +} + +// TestBackoffDuration asserts that the backoffDuration func returns Durations +// within the timeout handler's bounds. +func TestBackoffDuration(t *testing.T) { + testcases := []struct { + description string + inputCount uint + jitterFunc func(int) int + expectedDuration time.Duration + }{ + { + description: "an input count that is too large is rounded to the maximum allowed exponent", + inputCount: uint(maxBackoffExponent + 1), + jitterFunc: func(in int) int { return in }, + expectedDuration: maxBackoffSeconds * time.Second, + }, + { + description: "a jittered number of seconds that is above the maximum allowed is constrained", + inputCount: 1, + jitterFunc: func(in int) int { return maxBackoffSeconds + 1 }, + expectedDuration: maxBackoffSeconds * time.Second, + }, + } + for _, tc := range testcases { + t.Run(tc.description, func(t *testing.T) { + result := backoffDuration(tc.inputCount, tc.jitterFunc) + if result != tc.expectedDuration { + t.Errorf("expected %q received %q", tc.expectedDuration.String(), result.String()) + } + }) + } +} diff --git a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go index 60bb0d37f82..95cc227d782 100644 --- a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go @@ -55,7 +55,7 @@ func getPipelineRunController(t *testing.T, d test.Data, recorder record.EventRe stopCh := make(chan struct{}) configMapWatcher := configmap.NewInformedWatcher(c.Kube, system.GetNamespace()) logger := zap.New(observer).Sugar() - th := reconciler.NewTimeoutHandler(c.Kube, c.Pipeline, stopCh, logger) + th := reconciler.NewTimeoutHandler(stopCh, logger) return test.TestAssets{ Controller: NewController( reconciler.Options{ diff --git a/pkg/reconciler/v1alpha1/taskrun/taskrun.go b/pkg/reconciler/v1alpha1/taskrun/taskrun.go index 813fca5b1d2..5894380a18a 100644 --- a/pkg/reconciler/v1alpha1/taskrun/taskrun.go +++ b/pkg/reconciler/v1alpha1/taskrun/taskrun.go @@ -310,30 +310,9 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error return err } if pod == nil { - // Pod is not present, create pod. pod, err = c.createPod(tr, rtr) if err != nil { - // This Run has failed, so we need to mark it as failed and stop reconciling it - var reason, msg string - if isExceededResourceQuotaError(err) { - reason = reasonExceededResourceQuota - msg = getExceededResourcesMessage(tr) - } else { - reason = reasonCouldntGetTask - if tr.Spec.TaskRef != nil { - msg = fmt.Sprintf("Missing or invalid Task %s/%s", tr.Namespace, tr.Spec.TaskRef.Name) - } else { - msg = fmt.Sprintf("Invalid TaskSpec") - } - } - tr.Status.SetCondition(&apis.Condition{ - Type: apis.ConditionSucceeded, - Status: corev1.ConditionFalse, - Reason: reason, - Message: fmt.Sprintf("%s: %v", msg, err), - }) - c.Recorder.Eventf(tr, corev1.EventTypeWarning, "BuildCreationFailed", "Failed to create build pod %q: %v", tr.Name, err) - c.Logger.Errorf("Failed to create build pod for task %q :%v", err, tr.Name) + c.handlePodCreationError(tr, err) return nil } go c.timeoutHandler.WaitTaskRun(tr, tr.Status.StartTime) @@ -424,6 +403,36 @@ func updateStatusFromPod(taskRun *v1alpha1.TaskRun, pod *corev1.Pod, resourceLis updateTaskRunResourceResult(taskRun, pod, resourceLister, kubeclient, logger) } +func (c *Reconciler) handlePodCreationError(tr *v1alpha1.TaskRun, err error) { + var reason, msg string + var succeededStatus corev1.ConditionStatus + if isExceededResourceQuotaError(err) { + succeededStatus = corev1.ConditionUnknown + reason = reasonExceededResourceQuota + backoff, currentlyBackingOff := c.timeoutHandler.GetBackoff(tr) + if !currentlyBackingOff { + go c.timeoutHandler.SetTaskRunTimer(tr, time.Until(backoff.NextAttempt)) + } + msg = fmt.Sprintf("%s, reattempted %d times", getExceededResourcesMessage(tr), backoff.NumAttempts) + } else { + succeededStatus = corev1.ConditionFalse + reason = reasonCouldntGetTask + if tr.Spec.TaskRef != nil { + msg = fmt.Sprintf("Missing or invalid Task %s/%s", tr.Namespace, tr.Spec.TaskRef.Name) + } else { + msg = fmt.Sprintf("Invalid TaskSpec") + } + } + tr.Status.SetCondition(&apis.Condition{ + Type: apis.ConditionSucceeded, + Status: succeededStatus, + Reason: reason, + Message: fmt.Sprintf("%s: %v", msg, err), + }) + c.Recorder.Eventf(tr, corev1.EventTypeWarning, "BuildCreationFailed", "Failed to create build pod %q: %v", tr.Name, err) + c.Logger.Errorf("Failed to create build pod for task %q: %v", tr.Name, err) +} + func updateTaskRunResourceResult(taskRun *v1alpha1.TaskRun, pod *corev1.Pod, resourceLister listers.PipelineResourceLister, kubeclient kubernetes.Interface, logger *zap.SugaredLogger) { if resources.TaskRunHasOutputImageResource(resourceLister.PipelineResources(taskRun.Namespace).Get, taskRun) && taskRun.IsSuccessful() { for _, container := range pod.Spec.Containers { @@ -512,7 +521,6 @@ func (c *Reconciler) updateLabelsAndAnnotations(tr *v1alpha1.TaskRun) (*v1alpha1 return newTr, nil } - // createPod creates a Pod based on the Task's configuration, with pvcName as a volumeMount // TODO(dibyom): Refactor resource setup/templating logic to its own function in the resources package func (c *Reconciler) createPod(tr *v1alpha1.TaskRun, rtr *resources.ResolvedTaskResources) (*corev1.Pod, error) { @@ -613,9 +621,14 @@ func (c *Reconciler) checkTimeout(tr *v1alpha1.TaskRun, ts *v1alpha1.TaskSpec, d c.Logger.Infof("Checking timeout for TaskRun %q (startTime %s, timeout %s, runtime %s)", tr.Name, tr.Status.StartTime, timeout, runtime) if runtime > timeout { c.Logger.Infof("TaskRun %q is timeout (runtime %s over %s), deleting pod", tr.Name, runtime, timeout) - if err := dp(tr.Status.PodName, &metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) { - c.Logger.Errorf("Failed to terminate pod: %v", err) - return true, err + // tr.Status.PodName will be empty if the pod was never successfully created. This condition + // can be reached, for example, by the pod never being schedulable due to limits imposed by + // a namespace's ResourceQuota. + if tr.Status.PodName != "" { + if err := dp(tr.Status.PodName, &metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) { + c.Logger.Errorf("Failed to terminate pod: %v", err) + return true, err + } } timeoutMsg := fmt.Sprintf("TaskRun %q failed to finish within %q", tr.Name, timeout.String()) diff --git a/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go b/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go index 4ecfae85919..ca7e9a6ad91 100644 --- a/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go +++ b/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go @@ -15,6 +15,7 @@ package taskrun import ( "context" + "errors" "fmt" "strings" "testing" @@ -40,9 +41,11 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" corev1 "k8s.io/api/core/v1" + k8sapierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + k8sruntimeschema "k8s.io/apimachinery/pkg/runtime/schema" fakekubeclientset "k8s.io/client-go/kubernetes/fake" ktesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" @@ -192,7 +195,7 @@ func getTaskRunController(t *testing.T, d test.Data) test.TestAssets { configMapWatcher := configmap.NewInformedWatcher(c.Kube, system.GetNamespace()) stopCh := make(chan struct{}) logger := zap.New(observer).Sugar() - th := reconciler.NewTimeoutHandler(c.Kube, c.Pipeline, stopCh, logger) + th := reconciler.NewTimeoutHandler(stopCh, logger) return test.TestAssets{ Controller: NewController( reconciler.Options{ @@ -1840,3 +1843,66 @@ func TestUpdateStatusFromPod(t *testing.T) { }) } } + +func TestHandlePodCreationError(t *testing.T) { + taskRun := tb.TaskRun("test-taskrun-pod-creation-failed", "foo", tb.TaskRunSpec( + tb.TaskRunTaskRef(simpleTask.Name), + ), tb.TaskRunStatus( + tb.TaskRunStartTime(time.Now()), + tb.Condition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionUnknown, + }), + )) + d := test.Data{ + TaskRuns: []*v1alpha1.TaskRun{taskRun}, + Tasks: []*v1alpha1.Task{simpleTask}, + } + testAssets := getTaskRunController(t, d) + c, ok := testAssets.Controller.Reconciler.(*Reconciler) + if !ok { + t.Errorf("failed to construct instance of taskrun reconciler") + return + } + + // Prevent backoff timer from starting + c.timeoutHandler.SetTaskRunCallbackFunc(nil) + + testcases := []struct { + description string + err error + expectedType apis.ConditionType + expectedStatus corev1.ConditionStatus + expectedReason string + }{ + { + description: "exceeded quota errors are surfaced in taskrun condition but do not fail taskrun", + err: k8sapierrors.NewForbidden(k8sruntimeschema.GroupResource{Group: "foo", Resource: "bar"}, "baz", errors.New("exceeded quota")), + expectedType: apis.ConditionSucceeded, + expectedStatus: corev1.ConditionUnknown, + expectedReason: reasonExceededResourceQuota, + }, + { + description: "errors other than exceeded quota fail the taskrun", + err: errors.New("this is a fatal error"), + expectedType: apis.ConditionSucceeded, + expectedStatus: corev1.ConditionFalse, + expectedReason: reasonCouldntGetTask, + }, + } + for _, tc := range testcases { + t.Run(tc.description, func(t *testing.T) { + c.handlePodCreationError(taskRun, tc.err) + foundCondition := false + for _, cond := range taskRun.Status.Conditions { + if cond.Type == tc.expectedType && cond.Status == tc.expectedStatus && cond.Reason == tc.expectedReason { + foundCondition = true + break + } + } + if !foundCondition { + t.Errorf("expected to find condition type %q, status %q and reason %q", tc.expectedType, tc.expectedStatus, tc.expectedReason) + } + }) + } +}