From 83e8c6397ca8c5b04c2864f82d1f7e65c8ea1c05 Mon Sep 17 00:00:00 2001 From: Priti Desai Date: Mon, 14 Sep 2020 10:43:19 -0700 Subject: [PATCH] enabling consuming task results in finally Final tasks can be configured to consume Results of PipelineTasks from tasks section --- docs/pipelines.md | 59 ++++---- .../pipeline/v1beta1/pipeline_validation.go | 19 --- .../v1beta1/pipeline_validation_test.go | 9 -- pkg/reconciler/pipeline/dag/dag.go | 21 ++- pkg/reconciler/pipelinerun/pipelinerun.go | 19 ++- .../pipelinerun/pipelinerun_test.go | 126 ++++++++++++++++++ .../resources/pipelinerunresolution.go | 3 + .../pipelinerun/resources/pipelinerunstate.go | 20 ++- .../resources/pipelinerunstate_test.go | 33 +++++ 9 files changed, 245 insertions(+), 64 deletions(-) diff --git a/docs/pipelines.md b/docs/pipelines.md index 204b0f3542b..64072e63855 100644 --- a/docs/pipelines.md +++ b/docs/pipelines.md @@ -688,6 +688,36 @@ spec: value: "someURL" ``` +#### Consuming `Task` execution results in `finally` + +Final tasks can be configured to consume `Results` of `PipelineTask` from `tasks` section: + +```yaml +spec: + tasks: + - name: count-comments-before + taskRef: + Name: count-comments + - name: add-comment + taskRef: + Name: add-comment + - name: count-comments-after + taskRef: + Name: count-comments + finally: + - name: check-count + taskRef: + Name: check-count + params: + - name: before-count + value: $(tasks.count-comments-before.results.count) #invalid + - name: after-count + value: $(tasks.count-comments-after.results.count) #invalid +``` +**Note** The scheduling of such final task does not change, it will still be executed in parallel with other +final tasks after all non-final tasks are done. + + ### `PipelineRun` Status with `finally` With `finally`, `PipelineRun` status is calculated based on `PipelineTasks` under `tasks` section and final tasks. @@ -765,35 +795,6 @@ no `runAfter` can be specified in final tasks. final tasks are guaranteed to be executed after all `PipelineTasks` therefore no `conditions` can be specified in final tasks. -#### Cannot configure `Task` execution results with `finally` - -Final tasks can not be configured to consume `Results` of `PipelineTask` from `tasks` section i.e. the following -example is not supported right now but we are working on adding support for the same (tracked in issue -[#2557](https://github.com/tektoncd/pipeline/issues/2557)). - -```yaml -spec: - tasks: - - name: count-comments-before - taskRef: - Name: count-comments - - name: add-comment - taskRef: - Name: add-comment - - name: count-comments-after - taskRef: - Name: count-comments - finally: - - name: check-count - taskRef: - Name: check-count - params: - - name: before-count - value: $(tasks.count-comments-before.results.count) #invalid - - name: after-count - value: $(tasks.count-comments-after.results.count) #invalid -``` - #### Cannot configure `Pipeline` result with `finally` Final tasks can emit `Results` but results emitted from the final tasks can not be configured in the diff --git a/pkg/apis/pipeline/v1beta1/pipeline_validation.go b/pkg/apis/pipeline/v1beta1/pipeline_validation.go index fac8294c2c5..11fd8e57758 100644 --- a/pkg/apis/pipeline/v1beta1/pipeline_validation.go +++ b/pkg/apis/pipeline/v1beta1/pipeline_validation.go @@ -472,10 +472,6 @@ func validateFinalTasks(finalTasks []PipelineTask) *apis.FieldError { } } - if err := validateTaskResultReferenceNotUsed(finalTasks); err != nil { - return err - } - if err := validateTasksInputFrom(finalTasks); err != nil { return err } @@ -483,21 +479,6 @@ func validateFinalTasks(finalTasks []PipelineTask) *apis.FieldError { return nil } -func validateTaskResultReferenceNotUsed(tasks []PipelineTask) *apis.FieldError { - for _, t := range tasks { - for _, p := range t.Params { - expressions, ok := GetVarSubstitutionExpressionsForParam(p) - if ok { - if LooksLikeContainsResultRefs(expressions) { - return apis.ErrInvalidValue(fmt.Sprintf("no task result allowed under params,"+ - "final task param %s has set task result as its value", p.Name), "spec.finally.task.params") - } - } - } - } - return nil -} - func validateTasksInputFrom(tasks []PipelineTask) *apis.FieldError { for _, t := range tasks { inputResources := []PipelineTaskInputResource{} diff --git a/pkg/apis/pipeline/v1beta1/pipeline_validation_test.go b/pkg/apis/pipeline/v1beta1/pipeline_validation_test.go index 30c5c34d1eb..6392ced0387 100644 --- a/pkg/apis/pipeline/v1beta1/pipeline_validation_test.go +++ b/pkg/apis/pipeline/v1beta1/pipeline_validation_test.go @@ -1524,15 +1524,6 @@ func TestValidateFinalTasks_Failure(t *testing.T) { }}, }, }}, - }, { - name: "invalid pipeline with final tasks having reference to task results", - finalTasks: []PipelineTask{{ - Name: "final-task", - TaskRef: &TaskRef{Name: "final-task"}, - Params: []Param{{ - Name: "param1", Value: ArrayOrString{Type: ParamTypeString, StringVal: "$(tasks.a-task.results.output)"}, - }}, - }}, }, { name: "invalid pipeline with final task specifying when expressions", finalTasks: []PipelineTask{{ diff --git a/pkg/reconciler/pipeline/dag/dag.go b/pkg/reconciler/pipeline/dag/dag.go index b8c2b1d711e..da8fe888ed4 100644 --- a/pkg/reconciler/pipeline/dag/dag.go +++ b/pkg/reconciler/pipeline/dag/dag.go @@ -66,18 +66,33 @@ func (g *Graph) addPipelineTask(t Task) (*Node, error) { return newNode, nil } -// Build returns a valid pipeline Graph. Returns error if the pipeline is invalid -func Build(tasks Tasks) (*Graph, error) { +// BuildWithoutLinks builds a Pipeline Graph with the specified tasks as nodes (without any links). +// Returns error if the pipeline is invalid +func BuildWithoutLinks(tasks Tasks) (*Graph, error) { d := newGraph() - deps := map[string][]string{} // Add all Tasks mentioned in the `PipelineSpec` for _, pt := range tasks.Items() { if _, err := d.addPipelineTask(pt); err != nil { return nil, fmt.Errorf("task %s is already present in Graph, can't add it again: %w", pt.HashKey(), err) } + } + return d, nil +} + +// Build returns a valid pipeline Graph with all the dependencies converted into appropriate links. +// Returns error if the pipeline is invalid +func Build(tasks Tasks) (*Graph, error) { + d, err := BuildWithoutLinks(tasks) + if err != nil { + return nil, err + } + + deps := map[string][]string{} + for _, pt := range tasks.Items() { deps[pt.HashKey()] = pt.Deps() } + // Process all from and runAfter constraints to add task dependency for pt, taskDeps := range deps { for _, previousTask := range taskDeps { diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index 604e355afe1..9bf380d1e5b 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -307,7 +307,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err // if a task in PipelineRunState is final task or not // the finally section is optional and might not exist // dfinally holds an empty Graph in the absence of finally clause - dfinally, err := dag.Build(v1beta1.PipelineTaskList(pipelineSpec.Finally)) + dfinally, err := dag.BuildWithoutLinks(v1beta1.PipelineTaskList(pipelineSpec.Finally)) if err != nil { // This Run has failed, so we need to mark it as failed and stop reconciling it pr.Status.MarkFailed(ReasonInvalidGraph, @@ -495,7 +495,7 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip logger := logging.FromContext(ctx) recorder := controller.GetEventRecorder(ctx) - var nextRprts []*resources.ResolvedPipelineRunTask + var nextRprts resources.PipelineRunState // when pipeline run is stopping, do not schedule any new task and only // wait for all running tasks to complete and report their status @@ -521,7 +521,20 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip resources.ApplyTaskResults(nextRprts, resolvedResultRefs) // GetFinalTasks only returns tasks when a DAG is complete - nextRprts = append(nextRprts, pipelineRunState.GetFinalTasks(d, dfinally)...) + finallyRprts := pipelineRunState.GetFinalTasks(d, dfinally) + + // Before creating TaskRun for scheduled final task, check if it's consuming a task result + // Resolve and apply task result wherever applicable, report failure in case resolution fails + for _, rprt := range finallyRprts { + resolvedResultRefs, err := resources.ResolveResultRefs(pipelineRunState, resources.PipelineRunState{rprt}) + if err != nil { + rprt.InvalidTaskResultsInFinally = true + logger.Warnf("Declaring final task %q failed as it failed to resolve task params for %q with error %v", rprt.PipelineTask.Name, pr.Name, err) + continue + } + resources.ApplyTaskResults(resources.PipelineRunState{rprt}, resolvedResultRefs) + nextRprts = append(nextRprts, rprt) + } for _, rprt := range nextRprts { if rprt == nil || rprt.Skip(pipelineRunState, d) { diff --git a/pkg/reconciler/pipelinerun/pipelinerun_test.go b/pkg/reconciler/pipelinerun/pipelinerun_test.go index 5f89923ecb8..16c8e20ca0e 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/pipelinerun/pipelinerun_test.go @@ -3923,6 +3923,132 @@ func TestReconcilePipeline_TaskSpecMetadata(t *testing.T) { } } +func TestReconcileWithTaskResultsInFinalTasks(t *testing.T) { + names.TestingSeed() + ps := []*v1beta1.Pipeline{tb.Pipeline("test-pipeline", tb.PipelineNamespace("foo"), tb.PipelineSpec( + tb.PipelineTask("dag-task-1", "dag-task"), + tb.PipelineTask("dag-task-2", "dag-task"), + tb.FinalPipelineTask("final-task-1", "final-task-1", + tb.PipelineTaskParam("finalParam", "$(tasks.dag-task-1.results.aResult)"), + ), + tb.FinalPipelineTask("final-task-2", "final-task-2", + tb.PipelineTaskParam("finalParam", "$(tasks.dag-task-2.results.aResult)"), + ), + ))} + prs := []*v1beta1.PipelineRun{tb.PipelineRun("test-pipeline-run-final-task-results", tb.PipelineRunNamespace("foo"), + tb.PipelineRunSpec("test-pipeline", + tb.PipelineRunServiceAccountName("test-sa-0"), + ), + )} + ts := []*v1beta1.Task{ + tb.Task("dag-task", tb.TaskNamespace("foo")), + tb.Task("final-task-1", tb.TaskNamespace("foo"), + tb.TaskSpec( + tb.TaskParam("finalParam", v1beta1.ParamTypeString), + ), + ), + tb.Task("final-task-2", tb.TaskNamespace("foo"), + tb.TaskSpec( + tb.TaskParam("finalParam", v1beta1.ParamTypeString), + ), + ), + } + trs := []*v1beta1.TaskRun{ + tb.TaskRun("test-pipeline-run-final-task-results-dag-task-1-xxyyy", + tb.TaskRunNamespace("foo"), + tb.TaskRunOwnerReference("PipelineRun", "test-pipeline-run-final-task-results", + tb.OwnerReferenceAPIVersion("tekton.dev/v1beta1"), + tb.Controller, tb.BlockOwnerDeletion, + ), + tb.TaskRunLabel("tekton.dev/pipeline", "test-pipeline"), + tb.TaskRunLabel("tekton.dev/pipelineRun", "test-pipeline-run-final-task-results"), + tb.TaskRunLabel("tekton.dev/pipelineTask", "dag-task-1"), + tb.TaskRunSpec( + tb.TaskRunTaskRef("hello-world"), + tb.TaskRunServiceAccountName("test-sa"), + ), + tb.TaskRunStatus( + tb.StatusCondition( + apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + }, + ), + tb.TaskRunResult("aResult", "aResultValue"), + ), + ), + tb.TaskRun("test-pipeline-run-final-task-results-dag-task-2-xxyyy", + tb.TaskRunNamespace("foo"), + tb.TaskRunOwnerReference("PipelineRun", "test-pipeline-run-final-task-results", + tb.OwnerReferenceAPIVersion("tekton.dev/v1beta1"), + tb.Controller, tb.BlockOwnerDeletion, + ), + tb.TaskRunLabel("tekton.dev/pipeline", "test-pipeline"), + tb.TaskRunLabel("tekton.dev/pipelineRun", "test-pipeline-run-final-task-results"), + tb.TaskRunLabel("tekton.dev/pipelineTask", "dag-task-2"), + tb.TaskRunSpec( + tb.TaskRunTaskRef("hello-world"), + tb.TaskRunServiceAccountName("test-sa"), + ), + tb.TaskRunStatus( + tb.StatusCondition( + apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + }, + ), + ), + ), + } + d := test.Data{ + PipelineRuns: prs, + Pipelines: ps, + Tasks: ts, + TaskRuns: trs, + } + prt := NewPipelineRunTest(d, t) + defer prt.Cancel() + + reconciledRun, clients := prt.reconcileRun("foo", "test-pipeline-run-final-task-results", []string{}, false) + + expectedTaskRunName := "test-pipeline-run-final-task-results-final-task-1-9l9zj" + expectedTaskRun := tb.TaskRun(expectedTaskRunName, + tb.TaskRunNamespace("foo"), + tb.TaskRunOwnerReference("PipelineRun", "test-pipeline-run-final-task-results", + tb.OwnerReferenceAPIVersion("tekton.dev/v1beta1"), + tb.Controller, tb.BlockOwnerDeletion, + ), + tb.TaskRunLabel("tekton.dev/pipeline", "test-pipeline"), + tb.TaskRunLabel("tekton.dev/pipelineRun", "test-pipeline-run-final-task-results"), + tb.TaskRunLabel("tekton.dev/pipelineTask", "final-task-1"), + tb.TaskRunSpec( + tb.TaskRunTaskRef("final-task-1"), + tb.TaskRunServiceAccountName("test-sa-0"), + tb.TaskRunParam("finalParam", "aResultValue"), + ), + ) + // Check that the expected TaskRun was created + actual, err := clients.Pipeline.TektonV1beta1().TaskRuns("foo").List(metav1.ListOptions{ + LabelSelector: "tekton.dev/pipelineTask=final-task-1,tekton.dev/pipelineRun=test-pipeline-run-final-task-results", + Limit: 1, + }) + + if err != nil { + t.Fatalf("Failure to list TaskRun's %s", err) + } + if len(actual.Items) != 1 { + t.Fatalf("Expected 1 TaskRuns got %d", len(actual.Items)) + } + actualTaskRun := actual.Items[0] + if d := cmp.Diff(&actualTaskRun, expectedTaskRun, ignoreResourceVersion); d != "" { + t.Errorf("expected to see TaskRun %v created. Diff %s", expectedTaskRunName, diff.PrintWantGot(d)) + } + + if s := reconciledRun.Status.TaskRuns["test-pipeline-run-final-task-results-final-task-2-mz4c7"].Status.GetCondition(apis.ConditionSucceeded); s.Status != corev1.ConditionFalse { + t.Fatalf("Status expected to be %s but is %s", corev1.ConditionFalse, s.Status) + } +} + // NewPipelineRunTest returns PipelineRunTest with a new PipelineRun controller created with specified state through data // This PipelineRunTest can be reused for multiple PipelineRuns by calling reconcileRun for each pipelineRun func NewPipelineRunTest(data test.Data, t *testing.T) *PipelineRunTest { diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go b/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go index 03f6688b5f4..f5de046b220 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go @@ -69,6 +69,9 @@ type ResolvedPipelineRunTask struct { ResolvedTaskResources *resources.ResolvedTaskResources // ConditionChecks ~~TaskRuns but for evaling conditions ResolvedConditionChecks TaskConditionCheckState // Could also be a TaskRun or maybe just a Pod? + // this flag is set for a task when task result resolution fails for that task + // and mainly used to create a task run object with failure for that task + InvalidTaskResultsInFinally bool } func (t ResolvedPipelineRunTask) IsDone() bool { diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go b/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go index fc18949076b..81adf693ab2 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go @@ -28,6 +28,12 @@ import ( "knative.dev/pkg/apis" ) +const ( + // ReasonInvalidTaskResultsInFinallyTask indicates that the final task was not able to resolve + // task result of a DAG task + ReasonInvalidTaskResultsInFinallyTask = "InvalidTaskResultsInFinallyTask" +) + // PipelineRunState is a slice of ResolvedPipelineRunTasks the represents the current execution // state of the PipelineRun. type PipelineRunState []*ResolvedPipelineRunTask @@ -264,7 +270,7 @@ func (state PipelineRunState) GetSkippedTasks(pr *v1beta1.PipelineRun, d *dag.Gr func (state PipelineRunState) GetTaskRunsStatus(pr *v1beta1.PipelineRun) map[string]*v1beta1.PipelineRunTaskRunStatus { status := make(map[string]*v1beta1.PipelineRunTaskRunStatus) for _, rprt := range state { - if rprt.TaskRun == nil && rprt.ResolvedConditionChecks == nil { + if rprt.TaskRun == nil && rprt.ResolvedConditionChecks == nil && !rprt.InvalidTaskResultsInFinally { continue } @@ -305,6 +311,18 @@ func (state PipelineRunState) GetTaskRunsStatus(pr *v1beta1.PipelineRun) map[str }) } } + // Maintain a TaskRun Object in pr.Status for a finally task which could not resolve task results + if rprt.InvalidTaskResultsInFinally { + if prtrs.Status == nil { + prtrs.Status = &v1beta1.TaskRunStatus{} + } + prtrs.Status.SetCondition(&apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + Reason: ReasonInvalidTaskResultsInFinallyTask, + Message: fmt.Sprintf("Finally Task %s in PipelineRun %s was having invalid task results", rprt.PipelineTask.Name, pr.Name), + }) + } status[rprt.TaskRunName] = prtrs } return status diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go b/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go index 4bad1e02b50..54f35870354 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go @@ -1030,3 +1030,36 @@ func TestGetPipelineConditionStatus_PipelineTimeouts(t *testing.T) { t.Fatalf("Expected to get status %s but got %s for state %v", corev1.ConditionFalse, c.Status, oneFinishedState) } } + +func TestPipelineRunState_GetTaskRunsStatus_InvalidTaskResultsInFinally(t *testing.T) { + taskRunName := "test-pipeline-run-final-task-results-final-task" + taskName := "final-task" + state := PipelineRunState{{ + TaskRunName: taskRunName, + PipelineTask: &v1beta1.PipelineTask{ + Name: taskName, + TaskSpec: &v1beta1.EmbeddedTask{ + TaskSpec: &v1beta1.TaskSpec{ + Steps: []v1beta1.Step{{ + Container: corev1.Container{Image: "ubuntu"}, + Script: "echo 0", + }}, + }, + }, + }, + InvalidTaskResultsInFinally: true, + }} + + s := state.GetTaskRunsStatus(&v1beta1.PipelineRun{}) + if s[taskRunName] == nil { + t.Fatalf("Expected to get status for task %s but got nil", taskName) + } + + status := s[taskRunName].Status.GetCondition(apis.ConditionSucceeded) + if status.Status != corev1.ConditionFalse { + t.Fatalf("Status expected to be %s but is %s", corev1.ConditionFalse, status.Status) + } + if status.Reason != ReasonInvalidTaskResultsInFinallyTask { + t.Fatalf("Reason expected to be %s but is %s", ReasonInvalidTaskResultsInFinallyTask, status.Reason) + } +}