Skip to content

Commit

Permalink
Switch PipelineRun timeout -> TaskRun logic to instead signal the Tas…
Browse files Browse the repository at this point in the history
…kRuns to stop

Fixes #5127

As noted in #5127, the logic around calculating a timeout for a `PipelineRun`'s `TaskRun` to make sure that the `TaskRun`'s timeout is always going to end before the `PipelineRun`'s timeout ends is problematic. It can result in race conditions where a `TaskRun` gets timed out, immediately followed by the `PipelineRun` being reconciled while not yet having hit the end of its own timeout. This change gets rid of that behavior, and instead sets the `TaskRun.Spec.Status` to a new value, `TaskRunTimedOut`, with the `TaskRun` reconciler handling that in the same way it does setting `TaskRun.Spec.Status` to `TaskRunCancelled`.

By doing this, we can unify the behavior for both `TaskRun`s and `Run`s upon `PipelineRun`s timing out, given that we already cancel `Run`s upon `PipelineRun` timeout, and we can kill off a bunch of flaky outcomes for `PipelineRun`s.

Huge thanks to @jerop for suggesting this approach!

Signed-off-by: Andrew Bayer <andrew.bayer@gmail.com>
  • Loading branch information
abayer committed Jul 14, 2022
1 parent 5126d15 commit 91a14c1
Show file tree
Hide file tree
Showing 10 changed files with 634 additions and 24 deletions.
8 changes: 8 additions & 0 deletions pkg/apis/pipeline/v1beta1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ const (
// TaskRunSpecStatusCancelled indicates that the user wants to cancel the task,
// if not already cancelled or terminated
TaskRunSpecStatusCancelled = "TaskRunCancelled"
// TaskRunSpecStatusTimedOut indicates that the PipelineRun owning this task wants to mark it as timed out,
// if not already cancelled or terminated
TaskRunSpecStatusTimedOut = "TaskRunTimedOut"
)

// TaskRunDebug defines the breakpoint config for a particular TaskRun
Expand Down Expand Up @@ -424,6 +427,11 @@ func (tr *TaskRun) IsCancelled() bool {
return tr.Spec.Status == TaskRunSpecStatusCancelled
}

// IsSpecTimedOut returns true if the TaskRun's spec status is set to TimedOut state
func (tr *TaskRun) IsSpecTimedOut() bool {
return tr.Spec.Status == TaskRunSpecStatusTimedOut
}

// HasTimedOut returns true if the TaskRun runtime is beyond the allowed timeout
func (tr *TaskRun) HasTimedOut(ctx context.Context, c clock.PassiveClock) bool {
if tr.Status.StartTime.IsZero() {
Expand Down
15 changes: 13 additions & 2 deletions pkg/apis/pipeline/v1beta1/taskrun_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestTaskRunIsDone(t *testing.T) {
},
}
if !tr.IsDone() {
t.Fatal("Expected pipelinerun status to be done")
t.Fatal("Expected taskrun status to be done")
}
}

Expand All @@ -131,7 +131,18 @@ func TestTaskRunIsCancelled(t *testing.T) {
},
}
if !tr.IsCancelled() {
t.Fatal("Expected pipelinerun status to be cancelled")
t.Fatal("Expected taskrun status to be cancelled")
}
}

func TestTaskRunIsSpecTimedOut(t *testing.T) {
tr := &v1beta1.TaskRun{
Spec: v1beta1.TaskRunSpec{
Status: v1beta1.TaskRunSpecStatusTimedOut,
},
}
if !tr.IsSpecTimedOut() {
t.Fatal("Expected taskrun status to be timed out")
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/pipeline/v1beta1/taskrun_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func (ts *TaskRunSpec) Validate(ctx context.Context) (errs *apis.FieldError) {
}

if ts.Status != "" {
if ts.Status != TaskRunSpecStatusCancelled {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s should be %s", ts.Status, TaskRunSpecStatusCancelled), "status"))
if ts.Status != TaskRunSpecStatusCancelled && ts.Status != TaskRunSpecStatusTimedOut {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s should be %s or %s", ts.Status, TaskRunSpecStatusCancelled, TaskRunSpecStatusTimedOut), "status"))
}
}
if ts.Timeout != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/pipeline/v1beta1/taskrun_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func TestTaskRunSpec_Invalidate(t *testing.T) {
},
Status: "TaskRunCancell",
},
wantErr: apis.ErrInvalidValue("TaskRunCancell should be TaskRunCancelled", "status"),
wantErr: apis.ErrInvalidValue("TaskRunCancell should be TaskRunCancelled or TaskRunTimedOut", "status"),
}, {
name: "invalid taskspec",
spec: v1beta1.TaskRunSpec{
Expand Down
32 changes: 22 additions & 10 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ const (
// ReasonCouldntCancel indicates that a PipelineRun was cancelled but attempting to update
// all of the running TaskRuns as cancelled failed.
ReasonCouldntCancel = "PipelineRunCouldntCancel"
// ReasonCouldntTimeOut indicates that a PipelineRun was timed out but attempting to update
// all of the running TaskRuns as timed out failed.
ReasonCouldntTimeOut = "PipelineRunCouldntTimeOut"
// ReasonInvalidTaskResultReference indicates a task result was declared
// but was not initialized by that task
ReasonInvalidTaskResultReference = "InvalidTaskResultReference"
Expand Down Expand Up @@ -579,6 +582,13 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get
// Reset the skipped status to trigger recalculation
pipelineRunFacts.ResetSkippedCache()

// If the pipelinerun has timed out, mark tasks as timed out and update status
if pr.HasTimedOut(ctx, c.Clock) {
if err := timeoutPipelineRun(ctx, logger, pr, c.PipelineClientSet); err != nil {
return err
}
}

after := pipelineRunFacts.GetPipelineConditionStatus(ctx, pr, logger, c.Clock)
switch after.Status {
case corev1.ConditionTrue:
Expand Down Expand Up @@ -611,8 +621,9 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get
return nil
}

// processRunTimeouts custom tasks are requested to cancel, if they have timed out. Custom tasks can do any cleanup
// during this step.
// processRunTimeouts custom tasks are requested to cancel, if they have timed out. Since there is no guarantee that a
// custom task reconciler has its own logic for timing out a Run, this needs to be handled by the PipelineRun reconciler.
// Custom tasks can do any cleanup during this step.
func (c *Reconciler) processRunTimeouts(ctx context.Context, pr *v1beta1.PipelineRun, pipelineState resources.PipelineRunState) error {
errs := []string{}
logger := logging.FromContext(ctx)
Expand All @@ -621,7 +632,7 @@ func (c *Reconciler) processRunTimeouts(ctx context.Context, pr *v1beta1.Pipelin
}
for _, rpt := range pipelineState {
if rpt.IsCustomTask() {
if rpt.Run != nil && !rpt.Run.IsCancelled() && (pr.HasTimedOut(ctx, c.Clock) || (rpt.Run.HasTimedOut(c.Clock) && !rpt.Run.IsDone())) {
if rpt.Run != nil && !rpt.Run.IsCancelled() && rpt.Run.HasTimedOut(c.Clock) && !rpt.Run.IsDone() {
logger.Infof("Cancelling run task: %s due to timeout.", rpt.RunName)
err := cancelRun(ctx, rpt.RunName, pr.Namespace, c.PipelineClientSet)
if err != nil {
Expand Down Expand Up @@ -1131,22 +1142,23 @@ func getTaskRunTimeout(ctx context.Context, pr *v1beta1.PipelineRun, rpt *resour

// calculateTaskRunTimeout returns the timeout to set when creating the ResolvedPipelineTask.
// `timeout` is:
// - If ResolvedPipelineTask is a Task, `timeout` is the minimum of Tasks Timeout and Pipeline Timeout
// - If ResolvedPipelineTask is a Finally Task, `timeout` is the Pipeline Timeout
// If there is no timeout for the TaskRun, returns 0.
// - If ResolvedPipelineTask is a Task, `timeout` is the PipelineRun's Tasks timeout if set, and otherwise the PipelineRun's Pipeline timeout.
// - If ResolvedPipelineTask is a Finally Task, `timeout` is the PipelineRun's Finally timeout if set, or the PipelineRun's Pipeline timeout.
//
// If ResolvedPipelineTask.PipelineTask.Timeout is set, and is less than `timeout`, ResolvedPipelineTask.PipelineTask.Timeout is returned
// If ResolvedPipelineTask.PipelineTask.Timeout is not set or is greater than `timeout`, `timeout` is returned.
// If there is no timeout for the TaskRun or the PipelineRun, returns 0.
// If pipeline level timeouts have already been exceeded, returns 1 second.
func calculateTaskRunTimeout(timeout time.Duration, pr *v1beta1.PipelineRun, rpt *resources.ResolvedPipelineTask, c clock.PassiveClock) *metav1.Duration {
if timeout != apisconfig.NoTimeoutDuration {
pElapsedTime := c.Since(pr.Status.StartTime.Time)
if pElapsedTime > timeout {
return &metav1.Duration{Duration: 1 * time.Second}
}
timeRemaining := (timeout - pElapsedTime)
// Return the smaller of timeRemaining and rpt.pipelineTask.timeout
if rpt.PipelineTask.Timeout != nil && rpt.PipelineTask.Timeout.Duration < timeRemaining {
if rpt.PipelineTask.Timeout != nil && rpt.PipelineTask.Timeout.Duration < timeout {
return &metav1.Duration{Duration: rpt.PipelineTask.Timeout.Duration}
}
return &metav1.Duration{Duration: timeRemaining}
return &metav1.Duration{Duration: timeout}
}

if timeout == apisconfig.NoTimeoutDuration && rpt.PipelineTask.Timeout != nil {
Expand Down
160 changes: 151 additions & 9 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1736,16 +1736,44 @@ spec:
name: test-pipeline
serviceAccountName: test-sa
status:
conditions:
- message: running...
reason: Running
status: Unknown
type: Succeeded
startTime: "2021-12-31T00:00:00Z"
runs:
test-pipeline-run-custom-task-hello-world-1:
pipelineTaskName: hello-world-1
status:
conditions:
- status: Unknown
type: Succeeded
`)}
prs[0].Spec.Timeout = tc.timeout
prs[0].Spec.Timeouts = tc.timeouts

runs := []*v1alpha1.Run{mustParseRunWithObjectMeta(t,
taskRunObjectMeta("test-pipeline-run-custom-task-hello-world-1", "test", "test-pipeline-run-custom-task",
"test-pipeline", "hello-world-1", true),
`
spec:
ref:
apiVersion: example.dev/v0
kind: Example
status:
conditions:
- status: Unknown
type: Succeeded
startTime: "2021-12-31T11:58:59Z"
`)}

cms := []*corev1.ConfigMap{withCustomTasks(newFeatureFlagsConfigMap())}
d := test.Data{
PipelineRuns: prs,
Pipelines: ps,
ConfigMaps: cms,
Runs: runs,
}
prt := newPipelineRunTest(d, t)
defer prt.Cancel()
Expand All @@ -1763,9 +1791,9 @@ status:
}

gotTimeoutValue := postReconcileRun.GetTimeout()
expectedTimeoutValue := time.Second
expectedTimeoutValue := time.Hour

if d := cmp.Diff(gotTimeoutValue, expectedTimeoutValue); d != "" {
if d := cmp.Diff(expectedTimeoutValue, gotTimeoutValue); d != "" {
t.Fatalf("Expected timeout for created Run, but got a mismatch %s", diff.PrintWantGot(d))
}

Expand Down Expand Up @@ -2755,6 +2783,120 @@ spec:
}
}

func TestReconcileFailsTaskRunTimeOut(t *testing.T) {
prName := "test-pipeline-fails-to-timeout"

// TestReconcileFailsTaskRunTimeOut runs "Reconcile" on a PipelineRun with a single TaskRun.
// The TaskRun cannot be timed out. Check that the pipelinerun timeout fails, that reconcile fails and
// an event is generated
names.TestingSeed()
prs := []*v1beta1.PipelineRun{parse.MustParsePipelineRun(t, `
metadata:
name: test-pipeline-fails-to-timeout
namespace: foo
spec:
pipelineRef:
name: test-pipeline
timeout: 1h0m0s
status:
conditions:
- message: running...
reason: Running
status: Unknown
type: Succeeded
startTime: "2021-12-31T22:59:00Z"
taskRuns:
test-pipeline-fails-to-timeouthello-world-1:
pipelineTaskName: hello-world-1
`)}
ps := []*v1beta1.Pipeline{parse.MustParsePipeline(t, `
metadata:
name: test-pipeline
namespace: foo
spec:
tasks:
- name: hello-world-1
taskRef:
name: hello-world
- name: hello-world-2
taskRef:
name: hello-world
`)}
tasks := []*v1beta1.Task{simpleHelloWorldTask}
taskRuns := []*v1beta1.TaskRun{
getTaskRun(
t,
"test-pipeline-fails-to-timeouthello-world-1",
prName,
"test-pipeline",
"hello-world",
corev1.ConditionUnknown,
),
}

cms := []*corev1.ConfigMap{withEnabledAlphaAPIFields(newFeatureFlagsConfigMap())}

d := test.Data{
PipelineRuns: prs,
Pipelines: ps,
Tasks: tasks,
TaskRuns: taskRuns,
ConfigMaps: cms,
}

testAssets, cancel := getPipelineRunController(t, d)
defer cancel()
c := testAssets.Controller
clients := testAssets.Clients
failingReactorActivated := true

// Make the patch call fail, i.e. make it so that the controller fails to cancel the TaskRun
clients.Pipeline.PrependReactor("patch", "taskruns", func(action ktesting.Action) (bool, runtime.Object, error) {
return failingReactorActivated, nil, fmt.Errorf("i'm sorry Dave, i'm afraid i can't do that")
})

err := c.Reconciler.Reconcile(testAssets.Ctx, "foo/test-pipeline-fails-to-timeout")
if err == nil {
t.Errorf("Expected to see error returned from reconcile after failing to timeout TaskRun but saw none!")
}

// Check that the PipelineRun is still running with correct error message
reconciledRun, err := clients.Pipeline.TektonV1beta1().PipelineRuns("foo").Get(testAssets.Ctx, "test-pipeline-fails-to-timeout", metav1.GetOptions{})
if err != nil {
t.Fatalf("Somehow had error getting reconciled run out of fake client: %s", err)
}

if val, ok := reconciledRun.GetLabels()[pipeline.PipelineLabelKey]; !ok {
t.Fatalf("expected pipeline label")
} else if d := cmp.Diff("test-pipeline", val); d != "" {
t.Errorf("expected to see pipeline label. Diff %s", diff.PrintWantGot(d))
}

// The PipelineRun should not be timed out b/c we couldn't timeout the TaskRun
checkPipelineRunConditionStatusAndReason(t, reconciledRun, corev1.ConditionUnknown, ReasonCouldntTimeOut)
// The event here is "Normal" because in case we fail to timeout we leave the condition to unknown
// Further reconcile might converge then the status of the pipeline.
// See https://github.com/tektoncd/pipeline/issues/2647 for further details.
wantEvents := []string{
"Normal PipelineRunCouldntTimeOut PipelineRun \"test-pipeline-fails-to-timeout\" was timed out but had errors trying to time out TaskRuns and/or Runs",
"Warning InternalError 1 error occurred",
}
err = eventstest.CheckEventsOrdered(t, testAssets.Recorder.Events, prName, wantEvents)
if err != nil {
t.Errorf(err.Error())
}

// Turn off failing reactor and retry reconciliation
failingReactorActivated = false

err = c.Reconciler.Reconcile(testAssets.Ctx, "foo/test-pipeline-fails-to-timeout")
if err == nil {
// No error is ok
} else if ok, _ := controller.IsRequeueKey(err); !ok { // Requeue is also fine.
t.Errorf("Expected to timeout TaskRun successfully!")
}
}

func TestReconcilePropagateLabelsAndAnnotations(t *testing.T) {
names.TestingSeed()

Expand Down Expand Up @@ -3094,7 +3236,7 @@ status:
reconciledRun, _ := prt.reconcileRun("foo", "test-pipeline-retry-run-with-timeout", []string{}, false)

if len(reconciledRun.Status.TaskRuns["hello-world-1"].Status.RetriesStatus) != tc.retries {
t.Fatalf(" %d retry expected but %d ", tc.retries, len(reconciledRun.Status.TaskRuns["hello-world-1"].Status.RetriesStatus))
t.Fatalf(" %d retries expected but got %d ", tc.retries, len(reconciledRun.Status.TaskRuns["hello-world-1"].Status.RetriesStatus))
}

if status := reconciledRun.Status.TaskRuns["hello-world-1"].Status.GetCondition(apis.ConditionSucceeded).Status; status != tc.conditionSucceeded {
Expand Down Expand Up @@ -3240,7 +3382,7 @@ func TestGetTaskRunTimeout(t *testing.T) {
},
},
},
expected: &metav1.Duration{Duration: 10 * time.Minute},
expected: &metav1.Duration{Duration: 20 * time.Minute},
}, {
name: "taskrun with elapsed time; task.timeout applies",
timeoutFields: &v1beta1.TimeoutFields{
Expand All @@ -3259,11 +3401,11 @@ func TestGetTaskRunTimeout(t *testing.T) {
},
},
},
expected: &metav1.Duration{Duration: 10 * time.Minute},
expected: &metav1.Duration{Duration: 15 * time.Minute},
}, {
name: "taskrun with elapsed time; timeouts.pipeline applies",
timeoutFields: &v1beta1.TimeoutFields{
Tasks: &metav1.Duration{Duration: 20 * time.Minute},
Pipeline: &metav1.Duration{Duration: 20 * time.Minute},
},
startTime: now.Add(-10 * time.Minute),
rpt: &resources.ResolvedPipelineTask{
Expand All @@ -3278,7 +3420,7 @@ func TestGetTaskRunTimeout(t *testing.T) {
},
},
},
expected: &metav1.Duration{Duration: 10 * time.Minute},
expected: &metav1.Duration{Duration: 15 * time.Minute},
}}

for _, tc := range tcs {
Expand All @@ -3296,7 +3438,7 @@ func TestGetTaskRunTimeout(t *testing.T) {
},
},
}
if d := cmp.Diff(getTaskRunTimeout(context.TODO(), pr, tc.rpt, testClock), tc.expected); d != "" {
if d := cmp.Diff(tc.expected, getTaskRunTimeout(context.TODO(), pr, tc.rpt, testClock)); d != "" {
t.Errorf("Unexpected task run timeout. Diff %s", diff.PrintWantGot(d))
}
})
Expand Down Expand Up @@ -3449,7 +3591,7 @@ func TestGetFinallyTaskRunTimeout(t *testing.T) {
},
},
},
expected: &metav1.Duration{Duration: 11 * time.Minute},
expected: &metav1.Duration{Duration: 15 * time.Minute},
}}

for _, tc := range tcs {
Expand Down
Loading

0 comments on commit 91a14c1

Please sign in to comment.