diff --git a/cmd/controller/main.go b/cmd/controller/main.go
index e0ed68da096..dbb168042c1 100644
--- a/cmd/controller/main.go
+++ b/cmd/controller/main.go
@@ -113,7 +113,7 @@ func main() {
 
 	pipelineInformer := pipelineInformerFactory.Tekton().V1alpha1().Pipelines()
 	pipelineRunInformer := pipelineInformerFactory.Tekton().V1alpha1().PipelineRuns()
-	timeoutHandler := reconciler.NewTimeoutHandler(kubeClient, pipelineClient, stopCh, logger)
+	timeoutHandler := reconciler.NewTimeoutHandler(stopCh, logger)
 
 	trc := taskrun.NewController(opt,
 		taskRunInformer,
@@ -141,7 +141,7 @@ func main() {
 	}
 	timeoutHandler.SetTaskRunCallbackFunc(trc.Enqueue)
 	timeoutHandler.SetPipelineRunCallbackFunc(prc.Enqueue)
-	timeoutHandler.CheckTimeouts()
+	timeoutHandler.CheckTimeouts(kubeClient, pipelineClient)
 
 	// Watch the logging config map and dynamically update logging levels.
 	configMapWatcher.Watch(logging.ConfigName, logging.UpdateLevelFromConfigMap(logger, atomicLevel, logging.ControllerLogKey))
diff --git a/pkg/reconciler/timeout_handler.go b/pkg/reconciler/timeout_handler.go
index e8432d9ca74..e995c69a15d 100644
--- a/pkg/reconciler/timeout_handler.go
+++ b/pkg/reconciler/timeout_handler.go
@@ -1,6 +1,8 @@
 package reconciler
 
 import (
+	"math"
+	"math/rand"
 	"sync"
 
 	"time"
@@ -12,12 +14,14 @@ import (
 	"k8s.io/client-go/kubernetes"
 )
 
-var (
-	defaultFunc = func(i interface{}) {}
+const (
+	defaultTimeout    = 10 * time.Minute
+	maxBackoffSeconds = 120
 )
 
-const (
-	defaultTimeout = 10 * time.Minute
+var (
+	defaultFunc        = func(i interface{}) {}
+	maxBackoffExponent = math.Ceil(math.Log2(maxBackoffSeconds))
 )
 
 // StatusKey interface to be implemented by Taskrun Pipelinerun types
@@ -25,32 +29,41 @@ type StatusKey interface {
 	GetRunKey() string
 }
 
+// backoff contains state of exponential backoff for a given StatusKey
+type backoff struct {
+	// NumAttempts reflects the number of times a given StatusKey has been delayed
+	NumAttempts uint
+	// NextAttempt is the point in time at which this backoff expires
+	NextAttempt time.Time
+}
+
+// jitterFunc is a func applied to a computed backoff duration to remove uniformity
+// from its results. A jitterFunc receives the number of seconds calculated by a
+// backoff algorithm and returns the "jittered" result.
+type jitterFunc func(numSeconds int) (jitteredSeconds int)
+
 // TimeoutSet contains required k8s interfaces to handle build timeouts
 type TimeoutSet struct {
 	logger                  *zap.SugaredLogger
-	kubeclientset           kubernetes.Interface
-	pipelineclientset       clientset.Interface
 	taskRunCallbackFunc     func(interface{})
 	pipelineRunCallbackFunc func(interface{})
 	stopCh                  <-chan struct{}
 	done                    map[string]chan bool
 	doneMut                 sync.Mutex
+	backoffs                map[string]backoff
+	backoffsMut             sync.Mutex
 }
 
 // NewTimeoutHandler returns TimeoutSet filled structure
 func NewTimeoutHandler(
-	kubeclientset kubernetes.Interface,
-	pipelineclientset clientset.Interface,
 	stopCh <-chan struct{},
 	logger *zap.SugaredLogger,
 ) *TimeoutSet {
 	return &TimeoutSet{
-		kubeclientset:     kubeclientset,
-		pipelineclientset: pipelineclientset,
-		stopCh:            stopCh,
-		done:              make(map[string]chan bool),
-		doneMut:           sync.Mutex{},
-		logger:            logger,
+		stopCh:   stopCh,
+		done:     make(map[string]chan bool),
+		backoffs: make(map[string]backoff),
+		logger:   logger,
 	}
 }
 
@@ -64,16 +77,20 @@ func (t *TimeoutSet) SetPipelineRunCallbackFunc(f func(interface{})) {
 	t.pipelineRunCallbackFunc = f
 }
 
-// Release function deletes key from timeout map
+// Release deletes channels and data that are specific to a StatusKey object.
 func (t *TimeoutSet) Release(runObj StatusKey) {
 	key := runObj.GetRunKey()
 	t.doneMut.Lock()
 	defer t.doneMut.Unlock()
 
+	t.backoffsMut.Lock()
+	defer t.backoffsMut.Unlock()
+
 	if finished, ok := t.done[key]; ok {
 		delete(t.done, key)
 		close(finished)
 	}
+	delete(t.backoffs, key)
 }
 
 func (t *TimeoutSet) getOrCreateFinishedChan(runObj StatusKey) chan bool {
@@ -101,10 +118,52 @@ func GetTimeout(d *metav1.Duration) time.Duration {
 	return timeout
 }
 
+// GetBackoff records the number of times it has seen a TaskRun and calculates an
+// appropriate backoff deadline based on that count. Only one backoff per TaskRun
+// may be active at any moment. Requests for a new backoff in the face of an
+// existing one will be ignored and details of the existing backoff will be returned
+// instead. Further, if a calculated backoff time is after the timeout of the TaskRun
+// then the time of the timeout will be returned instead.
+//
+// Returned values are a backoff struct containing a NumAttempts field with the
+// number of attempts performed for this TaskRun and a NextAttempt field
+// describing the time at which the next attempt should be performed.
+// Additionally a boolean is returned indicating whether a backoff for the
+// TaskRun is already in progress.
+func (t *TimeoutSet) GetBackoff(tr *v1alpha1.TaskRun) (backoff, bool) {
+	t.backoffsMut.Lock()
+	defer t.backoffsMut.Unlock()
+	b := t.backoffs[tr.GetRunKey()]
+	if time.Now().Before(b.NextAttempt) {
+		return b, true
+	}
+	b.NumAttempts += 1
+	b.NextAttempt = time.Now().Add(backoffDuration(b.NumAttempts, rand.Intn))
+	timeoutDeadline := tr.Status.StartTime.Time.Add(GetTimeout(tr.Spec.Timeout))
+	if timeoutDeadline.Before(b.NextAttempt) {
+		b.NextAttempt = timeoutDeadline
+	}
+	t.backoffs[tr.GetRunKey()] = b
+	return b, false
+}
+
+func backoffDuration(count uint, jf jitterFunc) time.Duration {
+	exp := float64(count)
+	if exp > maxBackoffExponent {
+		exp = maxBackoffExponent
+	}
+	seconds := int(math.Exp2(exp))
+	jittered := 1 + jf(seconds)
+	if jittered > maxBackoffSeconds {
+		jittered = maxBackoffSeconds
+	}
+	return time.Duration(jittered) * time.Second
+}
+
 // checkPipelineRunTimeouts function creates goroutines to wait for pipelinerun to
 // finish/timeout in a given namespace
-func (t *TimeoutSet) checkPipelineRunTimeouts(namespace string) {
-	pipelineRuns, err := t.pipelineclientset.TektonV1alpha1().PipelineRuns(namespace).List(metav1.ListOptions{})
+func (t *TimeoutSet) checkPipelineRunTimeouts(namespace string, pipelineclientset clientset.Interface) {
+	pipelineRuns, err := pipelineclientset.TektonV1alpha1().PipelineRuns(namespace).List(metav1.ListOptions{})
 	if err != nil {
 		t.logger.Errorf("Can't get pipelinerun list in namespace %s: %s", namespace, err)
 		return
@@ -122,22 +181,22 @@ func (t *TimeoutSet) checkPipelineRunTimeouts(namespace string) {
 
 // CheckTimeouts function iterates through all namespaces and calls corresponding
 // taskrun/pipelinerun timeout functions
-func (t *TimeoutSet) CheckTimeouts() {
-	namespaces, err := t.kubeclientset.CoreV1().Namespaces().List(metav1.ListOptions{})
+func (t *TimeoutSet) CheckTimeouts(kubeclientset kubernetes.Interface, pipelineclientset clientset.Interface) {
+	namespaces, err := kubeclientset.CoreV1().Namespaces().List(metav1.ListOptions{})
 	if err != nil {
 		t.logger.Errorf("Can't get namespaces list: %s", err)
 		return
 	}
 	for _, namespace := range namespaces.Items {
-		t.checkTaskRunTimeouts(namespace.GetName())
-		t.checkPipelineRunTimeouts(namespace.GetName())
+		t.checkTaskRunTimeouts(namespace.GetName(), pipelineclientset)
+		t.checkPipelineRunTimeouts(namespace.GetName(), pipelineclientset)
 	}
 }
 
 // checkTaskRunTimeouts function creates goroutines to wait for pipelinerun to
 // finish/timeout in a given namespace
-func (t *TimeoutSet) checkTaskRunTimeouts(namespace string) {
-	taskruns, err := t.pipelineclientset.TektonV1alpha1().TaskRuns(namespace).List(metav1.ListOptions{})
+func (t *TimeoutSet) checkTaskRunTimeouts(namespace string, pipelineclientset clientset.Interface) {
+	taskruns, err := pipelineclientset.TektonV1alpha1().TaskRuns(namespace).List(metav1.ListOptions{})
 	if err != nil {
 		t.logger.Errorf("Can't get taskrun list in namespace %s: %s", namespace, err)
 		return
@@ -172,26 +231,43 @@ func (t *TimeoutSet) waitRun(runObj StatusKey, timeout time.Duration, startTime
 		t.logger.Errorf("startTime must be specified in order for a timeout to be calculated accurately for %s", runObj.GetRunKey())
 		return
 	}
+	if callback == nil {
+		callback = defaultFunc
+	}
 	runtime := time.Since(startTime.Time)
-	finished := t.getOrCreateFinishedChan(runObj)
-
+	t.logger.Infof("About to start timeout timer for %s. started at %s, timeout is %s, running for %s", runObj.GetRunKey(), startTime.Time, timeout, runtime)
 	defer t.Release(runObj)
+	t.setTimer(runObj, timeout-runtime, callback)
+}
 
-	t.logger.Infof("About to start timeout timer for %s. started at %s, timeout is %s, running for %s", runObj.GetRunKey(), startTime.Time, timeout, runtime)
+// SetTaskRunTimer creates a blocking function for taskrun to wait for
+// 1. Stop signal, 2. TaskRun to complete or 3. a given Duration to elapse.
+//
+// Since the timer's duration is a parameter rather than being tied to
+// the lifetime of the TaskRun no resources are released after the timer
+// fires. It is the caller's responsibility to Release() the TaskRun when
+// work with it has completed.
+func (t *TimeoutSet) SetTaskRunTimer(tr *v1alpha1.TaskRun, d time.Duration) {
+	callback := t.taskRunCallbackFunc
+	if callback == nil {
+		t.logger.Errorf("attempted to set a timer for %q but no task run callback has been assigned", tr.Name)
+		return
+	}
+	t.setTimer(tr, d, callback)
+}
 
+func (t *TimeoutSet) setTimer(runObj StatusKey, timeout time.Duration, callback func(interface{})) {
+	finished := t.getOrCreateFinishedChan(runObj)
+	started := time.Now()
 	select {
 	case <-t.stopCh:
-		t.logger.Infof("Stopping timeout timer for %s", runObj.GetRunKey())
+		t.logger.Infof("stopping timer for %q", runObj.GetRunKey())
 		return
 	case <-finished:
-		t.logger.Infof("%s finished, stopping the timeout timer", runObj.GetRunKey())
+		t.logger.Infof("%q finished, stopping timer", runObj.GetRunKey())
 		return
-	case <-time.After(timeout - runtime):
-		t.logger.Infof("Timeout timer for %s has timed out (started at %s, timeout is %s, running for %s", runObj.GetRunKey(), startTime, timeout, time.Since(startTime.Time))
-		if callback != nil {
-			callback(runObj)
-		} else {
-			defaultFunc(runObj)
-		}
+	case <-time.After(timeout):
+		t.logger.Infof("timer for %q has activated after %s", runObj.GetRunKey(), time.Since(started).String())
+		callback(runObj)
 	}
 }
diff --git a/pkg/reconciler/timeout_handler_test.go b/pkg/reconciler/timeout_handler_test.go
index a061d6bbeb3..6ef78687f1c 100644
--- a/pkg/reconciler/timeout_handler_test.go
+++ b/pkg/reconciler/timeout_handler_test.go
@@ -69,7 +69,7 @@ func TestTaskRunCheckTimeouts(t *testing.T) {
 	defer close(stopCh)
 	c, _ := test.SeedTestData(t, d)
 	observer, _ := observer.New(zap.InfoLevel)
-	th := NewTimeoutHandler(c.Kube, c.Pipeline, stopCh, zap.New(observer).Sugar())
+	th := NewTimeoutHandler(stopCh, zap.New(observer).Sugar())
 	gotCallback := sync.Map{}
 	f := func(tr interface{}) {
 		trNew := tr.(*v1alpha1.TaskRun)
@@ -77,7 +77,7 @@ func TestTaskRunCheckTimeouts(t *testing.T) {
 	}
 
 	th.SetTaskRunCallbackFunc(f)
-	th.CheckTimeouts()
+	th.CheckTimeouts(c.Kube, c.Pipeline)
 
 	for _, tc := range []struct {
 		name           string
@@ -172,7 +172,7 @@ func TestPipelinRunCheckTimeouts(t *testing.T) {
 	c, _ := test.SeedTestData(t, d)
 	stopCh := make(chan struct{})
 	observer, _ := observer.New(zap.InfoLevel)
-	th := NewTimeoutHandler(c.Kube, c.Pipeline, stopCh, zap.New(observer).Sugar())
+	th := NewTimeoutHandler(stopCh, zap.New(observer).Sugar())
 	defer close(stopCh)
 
 	gotCallback := sync.Map{}
@@ -182,7 +182,7 @@ func TestPipelinRunCheckTimeouts(t *testing.T) {
 	}
 
 	th.SetPipelineRunCallbackFunc(f)
-	th.CheckTimeouts()
+	th.CheckTimeouts(c.Kube, c.Pipeline)
 	for _, tc := range []struct {
 		name           string
 		pr             *v1alpha1.PipelineRun
@@ -247,7 +247,7 @@ func TestWithNoFunc(t *testing.T) {
 	stopCh := make(chan struct{})
 	c, _ := test.SeedTestData(t, d)
 	observer, _ := observer.New(zap.InfoLevel)
-	testHandler := NewTimeoutHandler(c.Kube, c.Pipeline, stopCh, zap.New(observer).Sugar())
+	testHandler := NewTimeoutHandler(stopCh, zap.New(observer).Sugar())
 	defer func() {
 		// this delay will ensure there is no race condition between stopCh/ timeout channel getting triggered
 		time.Sleep(10 * time.Millisecond)
@@ -256,6 +256,98 @@ func TestWithNoFunc(t *testing.T) {
 			t.Fatal("Expected CheckTimeouts function not to panic")
 		}
 	}()
-	testHandler.CheckTimeouts()
+	testHandler.CheckTimeouts(c.Kube, c.Pipeline)
 
 }
+
+// TestGetTimeout checks that the timeout calculated for a given taskrun falls
+// back to the default taskrun timeout when none is provided.
+func TestGetTimeout(t *testing.T) {
+	testCases := []struct {
+		description      string
+		inputDuration    *metav1.Duration
+		expectedDuration time.Duration
+	}{
+		{
+			description:      "returns same duration as input when input is not nil",
+			inputDuration:    &metav1.Duration{Duration: 2 * time.Second},
+			expectedDuration: 2 * time.Second,
+		},
+		{
+			description:      "returns an end time using the default timeout if a TaskRun's timeout is nil",
+			inputDuration:    nil,
+			expectedDuration: defaultTimeout,
+		},
+	}
+	for _, tc := range testCases {
+		t.Run(tc.description, func(t *testing.T) {
+			receivedDuration := GetTimeout(tc.inputDuration)
+			if receivedDuration != tc.expectedDuration {
+				t.Errorf("expected %q received %q", tc.expectedDuration.String(), receivedDuration.String())
+			}
+		})
+	}
+}
+
+// TestSetTaskRunTimer checks that the SetTaskRunTimer method correctly calls the TaskRun
+// callback after a set amount of time.
+func TestSetTaskRunTimer(t *testing.T) {
+	taskRun := tb.TaskRun("test-taskrun-arbitrary-timer", testNs, tb.TaskRunSpec(
+		tb.TaskRunTaskRef(simpleTask.Name, tb.TaskRefAPIVersion("a1")),
+		tb.TaskRunTimeout(2*time.Second),
+	), tb.TaskRunStatus(tb.Condition(apis.Condition{
+		Type:   apis.ConditionSucceeded,
+		Status: corev1.ConditionUnknown}),
+		tb.TaskRunStartTime(time.Now().Add(-10*time.Second)),
+	))
+
+	stopCh := make(chan struct{})
+	observer, _ := observer.New(zap.InfoLevel)
+	testHandler := NewTimeoutHandler(stopCh, zap.New(observer).Sugar())
+	timerDuration := 50 * time.Millisecond
+	timerFailDeadline := 100 * time.Millisecond
+	doneCh := make(chan struct{})
+	callback := func(_ interface{}) {
+		close(doneCh)
+	}
+	testHandler.SetTaskRunCallbackFunc(callback)
+	go testHandler.SetTaskRunTimer(taskRun, timerDuration)
+	select {
+	case <-doneCh:
+		// The task run timer executed before the failure deadline
+	case <-time.After(timerFailDeadline):
+		t.Errorf("timer did not execute task run callback func within expected time")
+	}
+}
+
+// TestBackoffDuration asserts that the backoffDuration func returns Durations
+// within the timeout handler's bounds.
+func TestBackoffDuration(t *testing.T) {
+	testcases := []struct {
+		description      string
+		inputCount       uint
+		jitterFunc       func(int) int
+		expectedDuration time.Duration
+	}{
+		{
+			description:      "an input count that is too large is rounded to the maximum allowed exponent",
+			inputCount:       uint(maxBackoffExponent + 1),
+			jitterFunc:       func(in int) int { return in },
+			expectedDuration: maxBackoffSeconds * time.Second,
+		},
+		{
+			description:      "a jittered number of seconds that is above the maximum allowed is constrained",
+			inputCount:       1,
+			jitterFunc:       func(in int) int { return maxBackoffSeconds + 1 },
+			expectedDuration: maxBackoffSeconds * time.Second,
+		},
+	}
+	for _, tc := range testcases {
+		t.Run(tc.description, func(t *testing.T) {
+			result := backoffDuration(tc.inputCount, tc.jitterFunc)
+			if result != tc.expectedDuration {
+				t.Errorf("expected %q received %q", tc.expectedDuration.String(), result.String())
+			}
+		})
+	}
+}
diff --git a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go
index 60bb0d37f82..95cc227d782 100644
--- a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go
+++ b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go
@@ -55,7 +55,7 @@ func getPipelineRunController(t *testing.T, d test.Data, recorder record.EventRe
 	stopCh := make(chan struct{})
 	configMapWatcher := configmap.NewInformedWatcher(c.Kube, system.GetNamespace())
 	logger := zap.New(observer).Sugar()
-	th := reconciler.NewTimeoutHandler(c.Kube, c.Pipeline, stopCh, logger)
+	th := reconciler.NewTimeoutHandler(stopCh, logger)
 	return test.TestAssets{
 		Controller: NewController(
 			reconciler.Options{
diff --git a/pkg/reconciler/v1alpha1/taskrun/taskrun.go b/pkg/reconciler/v1alpha1/taskrun/taskrun.go
index 813fca5b1d2..5894380a18a 100644
--- a/pkg/reconciler/v1alpha1/taskrun/taskrun.go
+++ b/pkg/reconciler/v1alpha1/taskrun/taskrun.go
@@ -310,30 +310,9 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error
 		return err
 	}
 	if pod == nil {
-		// Pod is not present, create pod.
 		pod, err = c.createPod(tr, rtr)
 		if err != nil {
-			// This Run has failed, so we need to mark it as failed and stop reconciling it
-			var reason, msg string
-			if isExceededResourceQuotaError(err) {
-				reason = reasonExceededResourceQuota
-				msg = getExceededResourcesMessage(tr)
-			} else {
-				reason = reasonCouldntGetTask
-				if tr.Spec.TaskRef != nil {
-					msg = fmt.Sprintf("Missing or invalid Task %s/%s", tr.Namespace, tr.Spec.TaskRef.Name)
-				} else {
-					msg = fmt.Sprintf("Invalid TaskSpec")
-				}
-			}
-			tr.Status.SetCondition(&apis.Condition{
-				Type:    apis.ConditionSucceeded,
-				Status:  corev1.ConditionFalse,
-				Reason:  reason,
-				Message: fmt.Sprintf("%s: %v", msg, err),
-			})
-			c.Recorder.Eventf(tr, corev1.EventTypeWarning, "BuildCreationFailed", "Failed to create build pod %q: %v", tr.Name, err)
-			c.Logger.Errorf("Failed to create build pod for task %q :%v", err, tr.Name)
+			c.handlePodCreationError(tr, err)
 			return nil
 		}
 		go c.timeoutHandler.WaitTaskRun(tr, tr.Status.StartTime)
@@ -424,6 +403,36 @@ func updateStatusFromPod(taskRun *v1alpha1.TaskRun, pod *corev1.Pod, resourceLis
 	updateTaskRunResourceResult(taskRun, pod, resourceLister, kubeclient, logger)
 }
 
+func (c *Reconciler) handlePodCreationError(tr *v1alpha1.TaskRun, err error) {
+	var reason, msg string
+	var succeededStatus corev1.ConditionStatus
+	if isExceededResourceQuotaError(err) {
+		succeededStatus = corev1.ConditionUnknown
+		reason = reasonExceededResourceQuota
+		backoff, currentlyBackingOff := c.timeoutHandler.GetBackoff(tr)
+		if !currentlyBackingOff {
+			go c.timeoutHandler.SetTaskRunTimer(tr, time.Until(backoff.NextAttempt))
+		}
+		msg = fmt.Sprintf("%s, reattempted %d times", getExceededResourcesMessage(tr), backoff.NumAttempts)
+	} else {
+		succeededStatus = corev1.ConditionFalse
+		reason = reasonCouldntGetTask
+		if tr.Spec.TaskRef != nil {
+			msg = fmt.Sprintf("Missing or invalid Task %s/%s", tr.Namespace, tr.Spec.TaskRef.Name)
+		} else {
+			msg = fmt.Sprintf("Invalid TaskSpec")
+		}
+	}
+	tr.Status.SetCondition(&apis.Condition{
+		Type:    apis.ConditionSucceeded,
+		Status:  succeededStatus,
+		Reason:  reason,
+		Message: fmt.Sprintf("%s: %v", msg, err),
+	})
+	c.Recorder.Eventf(tr, corev1.EventTypeWarning, "BuildCreationFailed", "Failed to create build pod %q: %v", tr.Name, err)
+	c.Logger.Errorf("Failed to create build pod for task %q: %v", tr.Name, err)
+}
+
 func updateTaskRunResourceResult(taskRun *v1alpha1.TaskRun, pod *corev1.Pod, resourceLister listers.PipelineResourceLister, kubeclient kubernetes.Interface, logger *zap.SugaredLogger) {
 	if resources.TaskRunHasOutputImageResource(resourceLister.PipelineResources(taskRun.Namespace).Get, taskRun) && taskRun.IsSuccessful() {
 		for _, container := range pod.Spec.Containers {
@@ -512,7 +521,6 @@ func (c *Reconciler) updateLabelsAndAnnotations(tr *v1alpha1.TaskRun) (*v1alpha1
 	return newTr, nil
 }
 
-
 // createPod creates a Pod based on the Task's configuration, with pvcName as a volumeMount
 // TODO(dibyom): Refactor resource setup/templating logic to its own function in the resources package
 func (c *Reconciler) createPod(tr *v1alpha1.TaskRun, rtr *resources.ResolvedTaskResources) (*corev1.Pod, error) {
@@ -613,9 +621,14 @@ func (c *Reconciler) checkTimeout(tr *v1alpha1.TaskRun, ts *v1alpha1.TaskSpec, d
 	c.Logger.Infof("Checking timeout for TaskRun %q (startTime %s, timeout %s, runtime %s)", tr.Name, tr.Status.StartTime, timeout, runtime)
 	if runtime > timeout {
 		c.Logger.Infof("TaskRun %q is timeout (runtime %s over %s), deleting pod", tr.Name, runtime, timeout)
-		if err := dp(tr.Status.PodName, &metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
-			c.Logger.Errorf("Failed to terminate pod: %v", err)
-			return true, err
+		// tr.Status.PodName will be empty if the pod was never successfully created. This condition
+		// can be reached, for example, by the pod never being schedulable due to limits imposed by
+		// a namespace's ResourceQuota.
+		if tr.Status.PodName != "" {
+			if err := dp(tr.Status.PodName, &metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
+				c.Logger.Errorf("Failed to terminate pod: %v", err)
+				return true, err
+			}
 		}
 
 		timeoutMsg := fmt.Sprintf("TaskRun %q failed to finish within %q", tr.Name, timeout.String())
diff --git a/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go b/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go
index 4ecfae85919..ca7e9a6ad91 100644
--- a/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go
+++ b/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go
@@ -15,6 +15,7 @@ package taskrun
 
 import (
 	"context"
+	"errors"
 	"fmt"
 	"strings"
 	"testing"
@@ -40,9 +41,11 @@ import (
 	"go.uber.org/zap"
 	"go.uber.org/zap/zaptest/observer"
 	corev1 "k8s.io/api/core/v1"
+	k8sapierrors "k8s.io/apimachinery/pkg/api/errors"
 	"k8s.io/apimachinery/pkg/api/resource"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/runtime"
+	k8sruntimeschema "k8s.io/apimachinery/pkg/runtime/schema"
 	fakekubeclientset "k8s.io/client-go/kubernetes/fake"
 	ktesting "k8s.io/client-go/testing"
 	"k8s.io/client-go/tools/cache"
@@ -192,7 +195,7 @@ func getTaskRunController(t *testing.T, d test.Data) test.TestAssets {
 	configMapWatcher := configmap.NewInformedWatcher(c.Kube, system.GetNamespace())
 	stopCh := make(chan struct{})
 	logger := zap.New(observer).Sugar()
-	th := reconciler.NewTimeoutHandler(c.Kube, c.Pipeline, stopCh, logger)
+	th := reconciler.NewTimeoutHandler(stopCh, logger)
 	return test.TestAssets{
 		Controller: NewController(
 			reconciler.Options{
@@ -1840,3 +1843,66 @@ func TestUpdateStatusFromPod(t *testing.T) {
 		})
 	}
 }
+
+func TestHandlePodCreationError(t *testing.T) {
+	taskRun := tb.TaskRun("test-taskrun-pod-creation-failed", "foo", tb.TaskRunSpec(
+		tb.TaskRunTaskRef(simpleTask.Name),
+	), tb.TaskRunStatus(
+		tb.TaskRunStartTime(time.Now()),
+		tb.Condition(apis.Condition{
+			Type:   apis.ConditionSucceeded,
+			Status: corev1.ConditionUnknown,
+		}),
+	))
+	d := test.Data{
+		TaskRuns: []*v1alpha1.TaskRun{taskRun},
+		Tasks:    []*v1alpha1.Task{simpleTask},
+	}
+	testAssets := getTaskRunController(t, d)
+	c, ok := testAssets.Controller.Reconciler.(*Reconciler)
+	if !ok {
+		t.Errorf("failed to construct instance of taskrun reconciler")
+		return
+	}
+
+	// Prevent backoff timer from starting
+	c.timeoutHandler.SetTaskRunCallbackFunc(nil)
+
+	testcases := []struct {
+		description    string
+		err            error
+		expectedType   apis.ConditionType
+		expectedStatus corev1.ConditionStatus
+		expectedReason string
+	}{
+		{
+			description:    "exceeded quota errors are surfaced in taskrun condition but do not fail taskrun",
+			err:            k8sapierrors.NewForbidden(k8sruntimeschema.GroupResource{Group: "foo", Resource: "bar"}, "baz", errors.New("exceeded quota")),
+			expectedType:   apis.ConditionSucceeded,
+			expectedStatus: corev1.ConditionUnknown,
+			expectedReason: reasonExceededResourceQuota,
+		},
+		{
+			description:    "errors other than exceeded quota fail the taskrun",
+			err:            errors.New("this is a fatal error"),
+			expectedType:   apis.ConditionSucceeded,
+			expectedStatus: corev1.ConditionFalse,
+			expectedReason: reasonCouldntGetTask,
+		},
+	}
+	for _, tc := range testcases {
+		t.Run(tc.description, func(t *testing.T) {
+			c.handlePodCreationError(taskRun, tc.err)
+			foundCondition := false
+			for _, cond := range taskRun.Status.Conditions {
+				if cond.Type == tc.expectedType && cond.Status == tc.expectedStatus && cond.Reason == tc.expectedReason {
+					foundCondition = true
+					break
+				}
+			}
+			if !foundCondition {
+				t.Errorf("expected to find condition type %q, status %q and reason %q", tc.expectedType, tc.expectedStatus, tc.expectedReason)
+			}
+		})
+	}
+}