From 242155d666ecd3772bc5b2a88bb26e3d878b396b Mon Sep 17 00:00:00 2001 From: Priti Desai Date: Mon, 14 Sep 2020 10:43:19 -0700 Subject: [PATCH] enabling consuming task results in finally Final tasks can be configured to consume Results of PipelineTasks from tasks section --- docs/pipelines.md | 55 ++--- .../pipelinerun-with-final-tasks.yaml | 14 ++ .../pipeline/v1beta1/pipeline_validation.go | 25 +- .../v1beta1/pipeline_validation_test.go | 43 +++- pkg/reconciler/pipelinerun/pipelinerun.go | 13 +- .../pipelinerun/pipelinerun_test.go | 181 ++++++++++++++ .../resources/pipelinerunresolution.go | 13 + .../resources/pipelinerunresolution_test.go | 68 ++++++ .../pipelinerun/resources/pipelinerunstate.go | 9 + .../resources/pipelinerunstate_test.go | 49 ++++ .../resources/resultrefresolution.go | 9 + test/pipelinefinally_test.go | 223 ++++++++++++++---- .../pipelinerun-with-final-tasks.yaml | 12 + 13 files changed, 629 insertions(+), 85 deletions(-) diff --git a/docs/pipelines.md b/docs/pipelines.md index 39f763d499b..188c3d5ff22 100644 --- a/docs/pipelines.md +++ b/docs/pipelines.md @@ -788,6 +788,32 @@ spec: value: "someURL" ``` +#### Consuming `Task` execution results in `finally` + +Final tasks can be configured to consume `Results` of `PipelineTask` from the `tasks` section: + +```yaml +spec: + tasks: + - name: clone-app-repo + taskRef: + name: git-clone + finally: + - name: discover-git-commit + params: + - name: commit + value: $(tasks.clone-app-repo.results.commit) +``` +**Note:** The scheduling of such final task does not change, it will still be executed in parallel with other +final tasks after all non-final tasks are done. + +The controller resolves task results before executing the finally task `discover-git-commit`. If the task +`clone-app-repo` failed or skipped with [when expression](#guard-task-execution-using-whenexpressions) resulting in +uninitialized task result `commit`, the finally Task `discover-git-commit` will be included in the list of +`skippedTasks` and continues executing rest of the final tasks. The pipeline exits with `completion` instead of +`success` if a finally task is added to the list of `skippedTasks`. + + ### `PipelineRun` Status with `finally` With `finally`, `PipelineRun` status is calculated based on `PipelineTasks` under `tasks` section and final tasks. @@ -900,35 +926,6 @@ no `runAfter` can be specified in final tasks. final tasks are guaranteed to be executed after all `PipelineTasks` therefore no `conditions` can be specified in final tasks. -#### Cannot configure `Task` execution results with `finally` - -Final tasks can not be configured to consume `Results` of `PipelineTask` from `tasks` section i.e. the following -example is not supported right now but we are working on adding support for the same (tracked in issue -[#2557](https://github.com/tektoncd/pipeline/issues/2557)). - -```yaml -spec: - tasks: - - name: count-comments-before - taskRef: - Name: count-comments - - name: add-comment - taskRef: - Name: add-comment - - name: count-comments-after - taskRef: - Name: count-comments - finally: - - name: check-count - taskRef: - Name: check-count - params: - - name: before-count - value: $(tasks.count-comments-before.results.count) #invalid - - name: after-count - value: $(tasks.count-comments-after.results.count) #invalid -``` - #### Cannot configure `Pipeline` result with `finally` Final tasks can emit `Results` but results emitted from the final tasks can not be configured in the diff --git a/examples/v1beta1/pipelineruns/pipelinerun-with-final-tasks.yaml b/examples/v1beta1/pipelineruns/pipelinerun-with-final-tasks.yaml index 170884e4dfd..222a389a0f9 100644 --- a/examples/v1beta1/pipelineruns/pipelinerun-with-final-tasks.yaml +++ b/examples/v1beta1/pipelineruns/pipelinerun-with-final-tasks.yaml @@ -164,6 +164,20 @@ spec: workspaces: - name: source workspace: git-source + - name: check-git-commit + params: + - name: commit + value: $(tasks.clone-app-repo.results.commit) + taskSpec: + params: + - name: commit + steps: + - name: check-commit-initialized + image: alpine + script: | + if [[ ! $(params.commit) ]]; then + exit 1 + fi --- # PipelineRun to execute pipeline - clone-into-workspace-and-cleanup-workspace diff --git a/pkg/apis/pipeline/v1beta1/pipeline_validation.go b/pkg/apis/pipeline/v1beta1/pipeline_validation.go index f2431532800..556966b12f5 100644 --- a/pkg/apis/pipeline/v1beta1/pipeline_validation.go +++ b/pkg/apis/pipeline/v1beta1/pipeline_validation.go @@ -68,7 +68,7 @@ func (ps *PipelineSpec) Validate(ctx context.Context) (errs *apis.FieldError) { // Validate the pipeline's results errs = errs.Also(validatePipelineResults(ps.Results)) errs = errs.Also(validateTasksAndFinallySection(ps)) - errs = errs.Also(validateFinalTasks(ps.Finally)) + errs = errs.Also(validateFinalTasks(ps.Tasks, ps.Finally)) errs = errs.Also(validateWhenExpressions(ps.Tasks)) return errs } @@ -420,7 +420,7 @@ func validateTasksAndFinallySection(ps *PipelineSpec) *apis.FieldError { return nil } -func validateFinalTasks(finalTasks []PipelineTask) *apis.FieldError { +func validateFinalTasks(tasks []PipelineTask, finalTasks []PipelineTask) *apis.FieldError { for idx, f := range finalTasks { if len(f.RunAfter) != 0 { return apis.ErrInvalidValue(fmt.Sprintf("no runAfter allowed under spec.finally, final task %s has runAfter specified", f.Name), "").ViaFieldIndex("finally", idx) @@ -433,7 +433,10 @@ func validateFinalTasks(finalTasks []PipelineTask) *apis.FieldError { } } - if err := validateTaskResultReferenceNotUsed(finalTasks).ViaField("finally"); err != nil { + ts := PipelineTaskList(tasks).Names() + fts := PipelineTaskList(finalTasks).Names() + + if err := validateTaskResultReference(finalTasks, ts, fts).ViaField("finally"); err != nil { return err } @@ -444,14 +447,22 @@ func validateFinalTasks(finalTasks []PipelineTask) *apis.FieldError { return nil } -func validateTaskResultReferenceNotUsed(tasks []PipelineTask) *apis.FieldError { - for idx, t := range tasks { +func validateTaskResultReference(finalTasks []PipelineTask, ts, fts sets.String) *apis.FieldError { + for idx, t := range finalTasks { for _, p := range t.Params { expressions, ok := GetVarSubstitutionExpressionsForParam(p) if ok { if LooksLikeContainsResultRefs(expressions) { - return apis.ErrInvalidValue(fmt.Sprintf("no task result allowed under params,"+ - "final task param %s has set task result as its value", p.Name), "params").ViaIndex(idx) + resultRefs := NewResultRefs(expressions) + for _, resultRef := range resultRefs { + if fts.Has(resultRef.PipelineTask) { + return apis.ErrInvalidValue(fmt.Sprintf("invalid task result reference, "+ + "final task param %s has task result reference from a final task", p.Name), "params").ViaIndex(idx) + } else if !ts.Has(resultRef.PipelineTask) { + return apis.ErrInvalidValue(fmt.Sprintf("invalid task result reference, "+ + "final task param %s has task result reference from a task which is not defined in the pipeline", p.Name), "params").ViaIndex(idx) + } + } } } } diff --git a/pkg/apis/pipeline/v1beta1/pipeline_validation_test.go b/pkg/apis/pipeline/v1beta1/pipeline_validation_test.go index b4fa5a25bba..a97d47c8c80 100644 --- a/pkg/apis/pipeline/v1beta1/pipeline_validation_test.go +++ b/pkg/apis/pipeline/v1beta1/pipeline_validation_test.go @@ -1614,6 +1614,24 @@ func TestValidatePipelineWithFinalTasks_Success(t *testing.T) { }}, }, }, + }, { + name: "valid pipeline with final tasks referring to task results from a dag task", + p: &Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "pipeline"}, + Spec: PipelineSpec{ + Tasks: []PipelineTask{{ + Name: "non-final-task", + TaskRef: &TaskRef{Name: "non-final-task"}, + }}, + Finally: []PipelineTask{{ + Name: "final-task-1", + TaskRef: &TaskRef{Name: "final-task"}, + Params: []Param{{ + Name: "param1", Value: ArrayOrString{Type: ParamTypeString, StringVal: "$(tasks.non-final-task.results.output)"}, + }}, + }}, + }, + }, }} for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -1918,6 +1936,7 @@ func TestValidateTasksAndFinallySection_Failure(t *testing.T) { func TestValidateFinalTasks_Failure(t *testing.T) { tests := []struct { name string + tasks []PipelineTask finalTasks []PipelineTask expectedError apis.FieldError }{{ @@ -1960,16 +1979,32 @@ func TestValidateFinalTasks_Failure(t *testing.T) { Paths: []string{"finally[0].resources.inputs[0]"}, }, }, { - name: "invalid pipeline with final tasks having reference to task results", + name: "invalid pipeline with final tasks having task results reference from a final task", + finalTasks: []PipelineTask{{ + Name: "final-task-1", + TaskRef: &TaskRef{Name: "final-task"}, + }, { + Name: "final-task-2", + TaskRef: &TaskRef{Name: "final-task"}, + Params: []Param{{ + Name: "param1", Value: ArrayOrString{Type: ParamTypeString, StringVal: "$(tasks.final-task-1.results.output)"}, + }}, + }}, + expectedError: apis.FieldError{ + Message: `invalid value: invalid task result reference, final task param param1 has task result reference from a final task`, + Paths: []string{"finally[1].params"}, + }, + }, { + name: "invalid pipeline with final tasks having task results reference from non existent dag task", finalTasks: []PipelineTask{{ Name: "final-task", TaskRef: &TaskRef{Name: "final-task"}, Params: []Param{{ - Name: "param1", Value: ArrayOrString{Type: ParamTypeString, StringVal: "$(tasks.a-task.results.output)"}, + Name: "param1", Value: ArrayOrString{Type: ParamTypeString, StringVal: "$(tasks.no-dag-task-1.results.output)"}, }}, }}, expectedError: apis.FieldError{ - Message: `invalid value: no task result allowed under params,final task param param1 has set task result as its value`, + Message: `invalid value: invalid task result reference, final task param param1 has task result reference from a task which is not defined in the pipeline`, Paths: []string{"finally[0].params"}, }, }, { @@ -1990,7 +2025,7 @@ func TestValidateFinalTasks_Failure(t *testing.T) { }} for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := validateFinalTasks(tt.finalTasks) + err := validateFinalTasks(tt.tasks, tt.finalTasks) if err == nil { t.Errorf("Pipeline.ValidateFinalTasks() did not return error for invalid pipeline") } diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index f719e7fb9d7..396974e4c42 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -581,7 +581,18 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip if len(fnextRprts) != 0 { // apply the runtime context just before creating taskRuns for final tasks in queue resources.ApplyPipelineTaskContext(fnextRprts, pipelineRunFacts.GetPipelineTaskStatus(ctx)) - nextRprts = append(nextRprts, fnextRprts...) + + // Before creating TaskRun for scheduled final task, check if it's consuming a task result + // Resolve and apply task result wherever applicable, report warning in case resolution fails + for _, rprt := range fnextRprts { + resolvedResultRefs, err := resources.ResolveResultRef(pipelineRunFacts.State, rprt) + if err != nil { + logger.Infof("Final task %q is not executed as it could not resolve task params for %q: %v", rprt.PipelineTask.Name, pr.Name, err) + continue + } + resources.ApplyTaskResults(resources.PipelineRunState{rprt}, resolvedResultRefs) + nextRprts = append(nextRprts, rprt) + } } for _, rprt := range nextRprts { diff --git a/pkg/reconciler/pipelinerun/pipelinerun_test.go b/pkg/reconciler/pipelinerun/pipelinerun_test.go index 4856c2006de..2e50e91b401 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/pipelinerun/pipelinerun_test.go @@ -4644,6 +4644,187 @@ func TestReconciler_ReconcileKind_PipelineTaskContext(t *testing.T) { } } +func TestReconcileWithTaskResultsInFinalTasks(t *testing.T) { + names.TestingSeed() + + ps := []*v1beta1.Pipeline{{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline", Namespace: "foo"}, + Spec: v1beta1.PipelineSpec{ + Tasks: []v1beta1.PipelineTask{{ + Name: "dag-task-1", + TaskRef: &v1beta1.TaskRef{Name: "dag-task"}, + }, { + Name: "dag-task-2", + TaskRef: &v1beta1.TaskRef{Name: "dag-task"}, + }}, + Finally: []v1beta1.PipelineTask{{ + Name: "final-task-1", + TaskRef: &v1beta1.TaskRef{Name: "final-task"}, + Params: []v1beta1.Param{{ + Name: "finalParam", + Value: v1beta1.ArrayOrString{ + Type: "string", + StringVal: "$(tasks.dag-task-1.results.aResult)", + }}, + }, + }, { + Name: "final-task-2", + TaskRef: &v1beta1.TaskRef{Name: "final-task"}, + Params: []v1beta1.Param{{ + Name: "finalParam", + Value: v1beta1.ArrayOrString{ + Type: "string", + StringVal: "$(tasks.dag-task-2.results.aResult)", + }}, + }, + }}, + }, + }} + + prs := []*v1beta1.PipelineRun{{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline-run-final-task-results", Namespace: "foo"}, + Spec: v1beta1.PipelineRunSpec{ + PipelineRef: &v1beta1.PipelineRef{Name: "test-pipeline"}, + ServiceAccountName: "test-sa-0", + }, + }} + + ts := []*v1beta1.Task{{ + ObjectMeta: metav1.ObjectMeta{Name: "dag-task", Namespace: "foo"}, + }, { + ObjectMeta: metav1.ObjectMeta{Name: "final-task", Namespace: "foo"}, + Spec: v1beta1.TaskSpec{ + Params: []v1beta1.ParamSpec{{ + Name: "finalParam", + Type: v1beta1.ParamTypeString, + }}, + }, + }} + + trs := []*v1beta1.TaskRun{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pipeline-run-final-task-results-dag-task-1-xxyyy", + Namespace: "foo", + OwnerReferences: []metav1.OwnerReference{{ + Kind: "PipelineRun", + Name: "test-pipeline-run-final-task-results", + APIVersion: "tekton.dev/v1beta1", + }}, + Labels: map[string]string{ + "tekton.dev/pipeline": "test-pipeline", + "tekton.dev/pipelineRun": "test-pipeline-run-final-task-results", + "tekton.dev/pipelineTask": "dag-task-1", + }, + }, + Spec: v1beta1.TaskRunSpec{ + ServiceAccountName: "test-sa", + TaskRef: &v1beta1.TaskRef{Name: "hello-world"}, + }, + Status: v1beta1.TaskRunStatus{ + Status: duckv1beta1.Status{ + Conditions: duckv1beta1.Conditions{apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + Reason: v1beta1.PipelineRunReasonSuccessful.String(), + }}, + }, + TaskRunStatusFields: v1beta1.TaskRunStatusFields{ + TaskRunResults: []v1beta1.TaskRunResult{{Name: "aResult", Value: "aResultValue"}}, + }, + }, + }, { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pipeline-run-final-task-results-dag-task-2-xxyyy", + Namespace: "foo", + OwnerReferences: []metav1.OwnerReference{{ + Kind: "PipelineRun", + Name: "test-pipeline-run-final-task-results", + APIVersion: "tekton.dev/v1beta1", + }}, + Labels: map[string]string{ + "tekton.dev/pipeline": "test-pipeline", + "tekton.dev/pipelineRun": "test-pipeline-run-final-task-results", + "tekton.dev/pipelineTask": "dag-task-2", + }, + }, + Spec: v1beta1.TaskRunSpec{ + ServiceAccountName: "test-sa", + TaskRef: &v1beta1.TaskRef{Name: "hello-world"}, + }, + Status: v1beta1.TaskRunStatus{ + Status: duckv1beta1.Status{ + Conditions: duckv1beta1.Conditions{apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + }}, + }, + }, + }} + + d := test.Data{ + PipelineRuns: prs, + Pipelines: ps, + Tasks: ts, + TaskRuns: trs, + } + prt := NewPipelineRunTest(d, t) + defer prt.Cancel() + + reconciledRun, clients := prt.reconcileRun("foo", "test-pipeline-run-final-task-results", []string{}, false) + + expectedTaskRunName := "test-pipeline-run-final-task-results-final-task-1-9l9zj" + expectedTaskRun := v1beta1.TaskRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: expectedTaskRunName, + Namespace: "foo", + OwnerReferences: []metav1.OwnerReference{{ + Kind: "PipelineRun", + Name: "test-pipeline-run-final-task-results", + APIVersion: "tekton.dev/v1beta1", + Controller: &trueb, + BlockOwnerDeletion: &trueb, + }}, + Labels: map[string]string{ + "tekton.dev/pipeline": "test-pipeline", + "tekton.dev/pipelineRun": "test-pipeline-run-final-task-results", + "tekton.dev/pipelineTask": "final-task-1", + }, + Annotations: map[string]string{}, + }, + Spec: v1beta1.TaskRunSpec{ + ServiceAccountName: "test-sa-0", + TaskRef: &v1beta1.TaskRef{Name: "final-task"}, + Params: []v1beta1.Param{{Name: "finalParam", Value: v1beta1.ArrayOrString{Type: "string", StringVal: "aResultValue"}}}, + Resources: &v1beta1.TaskRunResources{}, + Timeout: &metav1.Duration{Duration: config.DefaultTimeoutMinutes * time.Minute}, + }, + } + + // Check that the expected TaskRun was created + actual, err := clients.Pipeline.TektonV1beta1().TaskRuns("foo").List(prt.TestAssets.Ctx, metav1.ListOptions{ + LabelSelector: "tekton.dev/pipelineTask=final-task-1,tekton.dev/pipelineRun=test-pipeline-run-final-task-results", + Limit: 1, + }) + + if err != nil { + t.Fatalf("Failure to list TaskRun's %s", err) + } + if len(actual.Items) != 1 { + t.Fatalf("Expected 1 TaskRuns got %d", len(actual.Items)) + } + actualTaskRun := actual.Items[0] + if d := cmp.Diff(expectedTaskRun, actualTaskRun, ignoreResourceVersion); d != "" { + t.Errorf("expected to see TaskRun %v created. Diff %s", expectedTaskRunName, diff.PrintWantGot(d)) + } + expectedSkippedTasks := []v1beta1.SkippedTask{{ + Name: "final-task-2", + }} + + if d := cmp.Diff(expectedSkippedTasks, reconciledRun.Status.SkippedTasks); d != "" { + t.Fatalf("Expected to see only one final task (final-task-2) in the list of skipped tasks. Diff: %s", diff.PrintWantGot(d)) + } +} + // NewPipelineRunTest returns PipelineRunTest with a new PipelineRun controller created with specified state through data // This PipelineRunTest can be reused for multiple PipelineRuns by calling reconcileRun for each pipelineRun func NewPipelineRunTest(data test.Data, t *testing.T) *PipelineRunTest { diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go b/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go index baacbd66f57..5164b3405ea 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go @@ -216,6 +216,19 @@ func (t *ResolvedPipelineRunTask) parentTasksSkip(facts *PipelineRunFacts) bool return false } +// IsFinallySkipped returns true if a finally task is not executed and skipped due to task result validation failure +func (t *ResolvedPipelineRunTask) IsFinallySkipped(facts *PipelineRunFacts) bool { + if t.IsStarted() { + return false + } + if facts.checkDAGTasksDone() && facts.isFinalTask(t.PipelineTask.Name) { + if _, err := ResolveResultRef(facts.State, t); err != nil { + return true + } + } + return false +} + // GetRun is a function that will retrieve a Run by name. type GetRun func(name string) (*v1alpha1.Run, error) diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go b/pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go index a123d9874d6..b6547faca0a 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go @@ -120,6 +120,10 @@ var pts = []v1beta1.PipelineTask{{ }, { Name: "mytask14", TaskRef: &v1beta1.TaskRef{APIVersion: "example.dev/v0", Kind: "Example", Name: "customtask"}, +}, { + Name: "mytask15", + TaskRef: &v1beta1.TaskRef{Name: "taskWithReferenceToTaskResult"}, + Params: []v1beta1.Param{{Name: "param1", Value: *v1beta1.NewArrayOrString("$(tasks.mytask1.results.result1)")}}, }} var p = &v1beta1.Pipeline{ @@ -2205,3 +2209,67 @@ func TestIsCustomTask(t *testing.T) { }) } } + +func TestResolvedPipelineRunTask_IsFinallySkipped(t *testing.T) { + tr := tb.TaskRun("dag-task", tb.TaskRunStatus( + tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + }), + tb.TaskRunResult("commit", "SHA2"), + )) + + state := PipelineRunState{{ + TaskRunName: "dag-task", + TaskRun: tr, + PipelineTask: &v1beta1.PipelineTask{ + Name: "dag-task", + TaskRef: &v1beta1.TaskRef{Name: "task"}, + }, + }, { + PipelineTask: &v1beta1.PipelineTask{ + Name: "final-task-1", + TaskRef: &v1beta1.TaskRef{Name: "task"}, + Params: []v1beta1.Param{{ + Name: "commit", + Value: *v1beta1.NewArrayOrString("$(tasks.dag-task.results.commit)"), + }}, + }, + }, { + PipelineTask: &v1beta1.PipelineTask{ + Name: "final-task-2", + TaskRef: &v1beta1.TaskRef{Name: "task"}, + Params: []v1beta1.Param{{ + Name: "commit", + Value: *v1beta1.NewArrayOrString("$(tasks.dag-task.results.missingResult)"), + }}, + }, + }} + + tasks := v1beta1.PipelineTaskList([]v1beta1.PipelineTask{*state[0].PipelineTask}) + d, err := dag.Build(tasks, tasks.Deps()) + if err != nil { + t.Fatalf("Could not get a dag from the dag tasks %#v: %v", state[0], err) + } + + // build graph with finally tasks + pts := []v1beta1.PipelineTask{*state[1].PipelineTask, *state[2].PipelineTask} + dfinally, err := dag.Build(v1beta1.PipelineTaskList(pts), map[string][]string{}) + if err != nil { + t.Fatalf("Could not get a dag from the finally tasks %#v: %v", pts, err) + } + + facts := &PipelineRunFacts{ + State: state, + TasksGraph: d, + FinalTasksGraph: dfinally, + } + + if state[1].IsFinallySkipped(facts) { + t.Fatalf("Didn't expect the finally task with a valid result reference to be skipped but it was skipped.") + } + + if !state[2].IsFinallySkipped(facts) { + t.Fatal("Expected the finally task with an invalid result reference to be skipped but it was not skipped.") + } +} diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go b/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go index a6a13721657..4378f994401 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go @@ -376,6 +376,12 @@ func (facts *PipelineRunFacts) GetSkippedTasks() []v1beta1.SkippedTask { } skipped = append(skipped, skippedTask) } + if rprt.IsFinallySkipped(facts) { + skippedTask := v1beta1.SkippedTask{ + Name: rprt.PipelineTask.Name, + } + skipped = append(skipped, skippedTask) + } } return skipped } @@ -465,6 +471,9 @@ func (facts *PipelineRunFacts) getPipelineTasksCount() pipelineRunStatusCount { // increment skip counter since the task is skipped case t.Skip(facts): s.Skipped++ + // checking if any finally tasks were referring to invalid/missing task results + case t.IsFinallySkipped(facts): + s.Skipped++ // increment incomplete counter since the task is pending and not executed yet default: s.Incomplete++ diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go b/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go index 088d7d5b2df..6ce12dea69e 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go @@ -1105,6 +1105,15 @@ func TestGetPipelineConditionStatus_WithFinalTasks(t *testing.T) { TaskRun: makeFailed(trs[0]), }} + // pipeline state with one DAG failed, one final task skipped + dagFailedFinalSkipped := PipelineRunState{{ + TaskRunName: "task0taskrun", + PipelineTask: &pts[0], + TaskRun: makeFailed(trs[0]), + }, { + PipelineTask: &pts[14], + }} + tcs := []struct { name string state PipelineRunState @@ -1153,6 +1162,18 @@ func TestGetPipelineConditionStatus_WithFinalTasks(t *testing.T) { expectedSkipped: 0, expectedFailed: 2, expectedCancelled: 0, + }, { + name: "pipeline with one failed DAG task and skipped final task", + state: dagFailedFinalSkipped, + dagTasks: []v1beta1.PipelineTask{pts[0]}, + finalTasks: []v1beta1.PipelineTask{pts[14]}, + expectedStatus: corev1.ConditionFalse, + expectedReason: v1beta1.PipelineRunReasonFailed.String(), + expectedSucceeded: 0, + expectedIncomplete: 0, + expectedSkipped: 1, + expectedFailed: 1, + expectedCancelled: 0, }} for _, tc := range tcs { @@ -1439,3 +1460,31 @@ func TestPipelineRunFacts_GetPipelineTaskStatus(t *testing.T) { }) } } + +func TestPipelineRunFacts_GetSkippedTasks(t *testing.T) { + dagFailedFinalSkipped := PipelineRunState{{ + TaskRunName: "task0taskrun", + PipelineTask: &pts[0], + TaskRun: makeFailed(trs[0]), + }, { + PipelineTask: &pts[14], + }} + expectedSkippedTasks := []v1beta1.SkippedTask{{Name: pts[14].Name}} + d, err := dag.Build(v1beta1.PipelineTaskList{pts[0]}, v1beta1.PipelineTaskList{pts[0]}.Deps()) + if err != nil { + t.Fatalf("Unexpected error while building graph for DAG tasks %v: %v", v1beta1.PipelineTaskList{pts[0]}, err) + } + df, err := dag.Build(v1beta1.PipelineTaskList{pts[14]}, map[string][]string{}) + if err != nil { + t.Fatalf("Unexpected error while building graph for final tasks %v: %v", v1beta1.PipelineTaskList{pts[14]}, err) + } + facts := PipelineRunFacts{ + State: dagFailedFinalSkipped, + TasksGraph: d, + FinalTasksGraph: df, + } + actualSkippedTasks := facts.GetSkippedTasks() + if d := cmp.Diff(actualSkippedTasks, expectedSkippedTasks); d != "" { + t.Fatalf("Mismatch skipped tasks %s", diff.PrintWantGot(d)) + } +} diff --git a/pkg/reconciler/pipelinerun/resources/resultrefresolution.go b/pkg/reconciler/pipelinerun/resources/resultrefresolution.go index fd8c6b51dfd..ac7c73b96c7 100644 --- a/pkg/reconciler/pipelinerun/resources/resultrefresolution.go +++ b/pkg/reconciler/pipelinerun/resources/resultrefresolution.go @@ -37,6 +37,15 @@ type ResolvedResultRef struct { FromRun string } +// ResolveResultRef resolves any ResultReference that are found in the target ResolvedPipelineRunTask +func ResolveResultRef(pipelineRunState PipelineRunState, target *ResolvedPipelineRunTask) (ResolvedResultRefs, error) { + resolvedResultRefs, err := convertToResultRefs(pipelineRunState, target) + if err != nil { + return nil, err + } + return resolvedResultRefs, nil +} + // ResolveResultRefs resolves any ResultReference that are found in the target ResolvedPipelineRunTask func ResolveResultRefs(pipelineRunState PipelineRunState, targets PipelineRunState) (ResolvedResultRefs, error) { var allResolvedResultRefs ResolvedResultRefs diff --git a/test/pipelinefinally_test.go b/test/pipelinefinally_test.go index 7cd2549d479..2eaecefeb32 100644 --- a/test/pipelinefinally_test.go +++ b/test/pipelinefinally_test.go @@ -17,8 +17,12 @@ package test import ( "context" + "strings" "testing" + "github.com/google/go-cmp/cmp" + "github.com/tektoncd/pipeline/test/diff" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" "github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun/resources" @@ -32,7 +36,7 @@ import ( "knative.dev/pkg/test/helpers" ) -func TestPipelineLevelFinally_OneDAGTaskFailed_Failure(t *testing.T) { +func TestPipelineLevelFinally_OneDAGTaskFailed_InvalidTaskResult_Failure(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -55,8 +59,8 @@ func TestPipelineLevelFinally_OneDAGTaskFailed_Failure(t *testing.T) { t.Fatalf("Failed to create dag Task: %s", err) } - finalTask := getSuccessTask(t, namespace) - if _, err := c.TaskClient.Create(ctx, finalTask, metav1.CreateOptions{}); err != nil { + successTask := getSuccessTask(t, namespace) + if _, err := c.TaskClient.Create(ctx, successTask, metav1.CreateOptions{}); err != nil { t.Fatalf("Failed to create final Task: %s", err) } @@ -65,22 +69,49 @@ func TestPipelineLevelFinally_OneDAGTaskFailed_Failure(t *testing.T) { t.Fatalf("Failed to create final Task checking executing status: %s", err) } - pipeline := getPipeline(t, - namespace, - map[string]string{ - "dagtask1": task.Name, - "dagtask2": delayedTask.Name, - "dagtask3": finalTask.Name, + taskProducingResult := getSuccessTaskProducingResults(t, namespace) + if _, err := c.TaskClient.Create(ctx, taskProducingResult, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create Task producing task results: %s", err) + } + + taskConsumingResult := getSuccessTaskConsumingResults(t, namespace) + if _, err := c.TaskClient.Create(ctx, taskConsumingResult, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create Task consuming task results: %s", err) + } + + we := v1beta1.WhenExpressions{{ + Input: "foo", + Operator: "notin", + Values: []string{"foo"}, + }} + + dag := map[string]pipelineTask{ + "dagtask1": { + TaskName: task.Name, + }, + "dagtask2": { + TaskName: delayedTask.Name, }, - map[string]string{ - "dagtask3": cond.Name, + "dagtask3": { + TaskName: successTask.Name, + Condition: cond.Name, }, - map[string]string{ - "finaltask1": finalTask.Name, - "finaltask2": finalTaskWithStatus.Name, + "dagtask4": { + TaskName: successTask.Name, + When: we, }, - map[string][]v1beta1.Param{ - "finaltask2": {{ + "dagtask5": { + TaskName: taskProducingResult.Name, + }, + } + + f := map[string]pipelineTask{ + "finaltask1": { + TaskName: successTask.Name, + }, + "finaltask2": { + TaskName: finalTaskWithStatus.Name, + Param: []v1beta1.Param{{ Name: "dagtask1-status", Value: v1beta1.ArrayOrString{ Type: "string", @@ -100,7 +131,42 @@ func TestPipelineLevelFinally_OneDAGTaskFailed_Failure(t *testing.T) { }, }}, }, - ) + // final task consuming result from a failed dag task + "finaltaskconsumingdagtask1": { + TaskName: taskConsumingResult.Name, + Param: []v1beta1.Param{{ + Name: "dagtask-result", + Value: v1beta1.ArrayOrString{ + Type: "string", + StringVal: "$(tasks.dagtask1.results.result)", + }, + }}, + }, + // final task consuming result from a skipped dag task due to when expression + "finaltaskconsumingdagtask4": { + TaskName: taskConsumingResult.Name, + Param: []v1beta1.Param{{ + Name: "dagtask-result", + Value: v1beta1.ArrayOrString{ + Type: "string", + StringVal: "$(tasks.dagtask4.results.result)", + }, + }}, + }, + "finaltaskconsumingdagtask5": { + TaskName: taskConsumingResult.Name, + Param: []v1beta1.Param{{ + Name: "dagtask-result", + Value: v1beta1.ArrayOrString{ + Type: "string", + StringVal: "$(tasks.dagtask5.results.result)", + }, + }}, + }, + } + + pipeline := getPipeline(t, namespace, dag, f) + if _, err := c.PipelineClient.Create(ctx, pipeline, metav1.CreateOptions{}); err != nil { t.Fatalf("Failed to create Pipeline: %s", err) } @@ -114,11 +180,26 @@ func TestPipelineLevelFinally_OneDAGTaskFailed_Failure(t *testing.T) { t.Fatalf("Waiting for PipelineRun %s to fail: %v", pipelineRun.Name, err) } + // Get the status of the PipelineRun. + pr, err := c.PipelineRunClient.Get(ctx, pipelineRun.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get PipelineRun %q: %v", pipelineRun.Name, err) + } + taskrunList, err := c.TaskRunClient.List(ctx, metav1.ListOptions{LabelSelector: "tekton.dev/pipelineRun=" + pipelineRun.Name}) if err != nil { t.Fatalf("Error listing TaskRuns for PipelineRun %s: %s", pipelineRun.Name, err) } + // expecting taskRuns for dagtask1, dagtask2, dagtask3 (with condition failure), dagtask5, finaltask1, finaltask2, and finaltaskconsumingdagtask5 + if len(taskrunList.Items) != 7 { + var s []string + for _, n := range taskrunList.Items { + s = append(s, n.Labels["tekton.dev/pipelineTask"]) + } + t.Fatalf("Error retrieving TaskRuns for PipelineRun %s. Expected 5 taskRuns and found taskRuns for: %s", pipelineRun.Name, strings.Join(s, ", ")) + } + var dagTask1EndTime, dagTask2EndTime, finalTaskStartTime *metav1.Time // verify dag task failed, parallel dag task succeeded, and final task succeeded for _, taskrunItem := range taskrunList.Items { @@ -137,6 +218,12 @@ func TestPipelineLevelFinally_OneDAGTaskFailed_Failure(t *testing.T) { if !isSkipped(t, n, taskrunItem.Status.Conditions) { t.Fatalf("dag task %s should have skipped due to condition failure", n) } + case n == "dagtask4": + t.Fatalf("task %s should have skipped due to when expression", n) + case n == "dagtask5": + if err := WaitForTaskRunState(ctx, c, taskrunItem.Name, TaskRunSucceed(taskrunItem.Name), "TaskRunSuccess"); err != nil { + t.Errorf("Error waiting for TaskRun to succeed: %v", err) + } case n == "finaltask1": if err := WaitForTaskRunState(ctx, c, taskrunItem.Name, TaskRunSucceed(taskrunItem.Name), "TaskRunSuccess"); err != nil { t.Errorf("Error waiting for TaskRun to succeed: %v", err) @@ -163,14 +250,45 @@ func TestPipelineLevelFinally_OneDAGTaskFailed_Failure(t *testing.T) { } } } + case n == "finaltaskconsumingdagtask5": + if err := WaitForTaskRunState(ctx, c, taskrunItem.Name, TaskRunSucceed(taskrunItem.Name), "TaskRunSuccess"); err != nil { + t.Errorf("Error waiting for TaskRun to succeed: %v", err) + } + for _, p := range taskrunItem.Spec.Params { + if p.Name == "dagtask-result" && p.Value.StringVal != "Hello" { + t.Errorf("Error resolving task result reference in a finally task %s", n) + } + } + case n == "finaltaskconsumingdagtask1" || n == "finaltaskconsumingdagtask4": + t.Fatalf("final task %s should have skipped due to missing task result reference", n) default: - t.Fatalf("TaskRuns were not found for both final and dag tasks") + t.Fatalf("Found unexpected taskRun %s", n) } } // final task should start executing after dagtask1 fails and dagtask2 is done if finalTaskStartTime.Before(dagTask1EndTime) || finalTaskStartTime.Before(dagTask2EndTime) { t.Fatalf("Final Tasks should start getting executed after all DAG tasks finishes") } + + // two final tasks referring to results must be skipped + // finaltaskconsumingdagtask1 has a reference to a task result from failed task + // finaltaskconsumingdagtask4 has a reference to a task result from skipped task with when expression + expectedSkippedTasks := []v1beta1.SkippedTask{{ + Name: "dagtask3", + }, { + Name: "dagtask4", + WhenExpressions: we, + }, { + Name: "finaltaskconsumingdagtask1", + }, { + Name: "finaltaskconsumingdagtask4", + }} + + if d := cmp.Diff(pr.Status.SkippedTasks, expectedSkippedTasks); d != "" { + t.Fatalf("Expected four skipped tasks, dag task with condition failure dagtask3, dag task with when expression,"+ + "two final tasks with missing result reference finaltaskconsumingdagtask1 and finaltaskconsumingdagtask4 in SkippedTasks."+ + " Diff: %s", diff.PrintWantGot(d)) + } } func TestPipelineLevelFinally_OneFinalTaskFailed_Failure(t *testing.T) { @@ -191,17 +309,15 @@ func TestPipelineLevelFinally_OneFinalTaskFailed_Failure(t *testing.T) { t.Fatalf("Failed to create final Task: %s", err) } - pipeline := getPipeline(t, - namespace, - map[string]string{ - "dagtask1": task.Name, - }, - map[string]string{}, - map[string]string{ - "finaltask1": finalTask.Name, - }, - map[string][]v1beta1.Param{}, - ) + pt := map[string]pipelineTask{ + "dagtask1": {TaskName: task.Name}, + } + + fpt := map[string]pipelineTask{ + "finaltask1": {TaskName: finalTask.Name}, + } + + pipeline := getPipeline(t, namespace, pt, fpt) if _, err := c.PipelineClient.Create(ctx, pipeline, metav1.CreateOptions{}); err != nil { t.Fatalf("Failed to create Pipeline: %s", err) } @@ -264,7 +380,7 @@ func isSkipped(t *testing.T, taskRunName string, conds duckv1beta1.Conditions) b return false } -func getTaskDef(n, namespace, script string, params []v1beta1.ParamSpec) *v1beta1.Task { +func getTaskDef(n, namespace, script string, params []v1beta1.ParamSpec, results []v1beta1.TaskResult) *v1beta1.Task { return &v1beta1.Task{ ObjectMeta: metav1.ObjectMeta{Name: n, Namespace: namespace}, Spec: v1beta1.TaskSpec{ @@ -272,21 +388,22 @@ func getTaskDef(n, namespace, script string, params []v1beta1.ParamSpec) *v1beta Container: corev1.Container{Image: "alpine"}, Script: script, }}, - Params: params, + Params: params, + Results: results, }, } } func getSuccessTask(t *testing.T, namespace string) *v1beta1.Task { - return getTaskDef(helpers.ObjectNameForTest(t), namespace, "exit 0", []v1beta1.ParamSpec{}) + return getTaskDef(helpers.ObjectNameForTest(t), namespace, "exit 0", []v1beta1.ParamSpec{}, []v1beta1.TaskResult{}) } func getFailTask(t *testing.T, namespace string) *v1beta1.Task { - return getTaskDef(helpers.ObjectNameForTest(t), namespace, "exit 1", []v1beta1.ParamSpec{}) + return getTaskDef(helpers.ObjectNameForTest(t), namespace, "exit 1", []v1beta1.ParamSpec{}, []v1beta1.TaskResult{}) } func getDelaySuccessTask(t *testing.T, namespace string) *v1beta1.Task { - return getTaskDef(helpers.ObjectNameForTest(t), namespace, "sleep 5; exit 0", []v1beta1.ParamSpec{}) + return getTaskDef(helpers.ObjectNameForTest(t), namespace, "sleep 5; exit 0", []v1beta1.ParamSpec{}, []v1beta1.TaskResult{}) } func getTaskVerifyingStatus(t *testing.T, namespace string) *v1beta1.Task { @@ -297,8 +414,15 @@ func getTaskVerifyingStatus(t *testing.T, namespace string) *v1beta1.Task { }, { Name: "dagtask3-status", }} - script := "exit 0" - return getTaskDef(helpers.ObjectNameForTest(t), namespace, script, params) + return getTaskDef(helpers.ObjectNameForTest(t), namespace, "exit 0", params, []v1beta1.TaskResult{}) +} + +func getSuccessTaskProducingResults(t *testing.T, namespace string) *v1beta1.Task { + return getTaskDef(helpers.ObjectNameForTest(t), namespace, "echo -n \"Hello\" > $(results.result.path)", []v1beta1.ParamSpec{}, []v1beta1.TaskResult{{Name: "result"}}) +} + +func getSuccessTaskConsumingResults(t *testing.T, namespace string) *v1beta1.Task { + return getTaskDef(helpers.ObjectNameForTest(t), namespace, "exit 0", []v1beta1.ParamSpec{{Name: "dagtask-result"}}, []v1beta1.TaskResult{}) } func getCondition(t *testing.T, namespace string) *v1alpha1.Condition { @@ -313,26 +437,37 @@ func getCondition(t *testing.T, namespace string) *v1alpha1.Condition { } } -func getPipeline(t *testing.T, namespace string, ts map[string]string, c map[string]string, f map[string]string, p map[string][]v1beta1.Param) *v1beta1.Pipeline { +type pipelineTask struct { + TaskName string + Condition string + Param []v1beta1.Param + When v1beta1.WhenExpressions +} + +func getPipeline(t *testing.T, namespace string, dag map[string]pipelineTask, f map[string]pipelineTask) *v1beta1.Pipeline { var pt []v1beta1.PipelineTask var fpt []v1beta1.PipelineTask - for k, v := range ts { + for k, v := range dag { task := v1beta1.PipelineTask{ Name: k, - TaskRef: &v1beta1.TaskRef{Name: v}, + TaskRef: &v1beta1.TaskRef{Name: v.TaskName}, } - if _, ok := c[k]; ok { + if v.Condition != "" { task.Conditions = []v1beta1.PipelineTaskCondition{{ - ConditionRef: c[k], + ConditionRef: v.Condition, }} } + if len(v.When) != 0 { + task.WhenExpressions = v.When + } pt = append(pt, task) } for k, v := range f { fpt = append(fpt, v1beta1.PipelineTask{ - Name: k, - TaskRef: &v1beta1.TaskRef{Name: v}, - Params: p[k], + Name: k, + + TaskRef: &v1beta1.TaskRef{Name: v.TaskName}, + Params: v.Param, }) } pipeline := &v1beta1.Pipeline{ diff --git a/test/yamls/v1beta1/pipelineruns/pipelinerun-with-final-tasks.yaml b/test/yamls/v1beta1/pipelineruns/pipelinerun-with-final-tasks.yaml index de4aaf6c52b..6a8a57d9400 100644 --- a/test/yamls/v1beta1/pipelineruns/pipelinerun-with-final-tasks.yaml +++ b/test/yamls/v1beta1/pipelineruns/pipelinerun-with-final-tasks.yaml @@ -164,6 +164,18 @@ spec: workspaces: - name: source workspace: git-source + - name: display-git-commit + params: + - name: commit + value: $(tasks.clone-app-repo.results.commit) + taskSpec: + params: + - name: commit + steps: + - name: display-commit + image: alpine + script: | + echo $(params.commit) --- # PipelineRun to execute pipeline - clone-into-workspace-and-cleanup-workspace