Skip to content

Commit

Permalink
enabling consuming task results in finally
Browse files Browse the repository at this point in the history
Final tasks can be configured to consume Results of PipelineTasks from
tasks section
  • Loading branch information
pritidesai committed Sep 16, 2020
1 parent 723fca9 commit 83e8c63
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 64 deletions.
59 changes: 30 additions & 29 deletions docs/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,36 @@ spec:
value: "someURL"
```

#### Consuming `Task` execution results in `finally`

Final tasks can be configured to consume `Results` of `PipelineTask` from `tasks` section:

```yaml
spec:
tasks:
- name: count-comments-before
taskRef:
Name: count-comments
- name: add-comment
taskRef:
Name: add-comment
- name: count-comments-after
taskRef:
Name: count-comments
finally:
- name: check-count
taskRef:
Name: check-count
params:
- name: before-count
value: $(tasks.count-comments-before.results.count) #invalid
- name: after-count
value: $(tasks.count-comments-after.results.count) #invalid
```
**Note** The scheduling of such final task does not change, it will still be executed in parallel with other
final tasks after all non-final tasks are done.


### `PipelineRun` Status with `finally`

With `finally`, `PipelineRun` status is calculated based on `PipelineTasks` under `tasks` section and final tasks.
Expand Down Expand Up @@ -765,35 +795,6 @@ no `runAfter` can be specified in final tasks.
final tasks are guaranteed to be executed after all `PipelineTasks` therefore no `conditions` can be specified in
final tasks.

#### Cannot configure `Task` execution results with `finally`

Final tasks can not be configured to consume `Results` of `PipelineTask` from `tasks` section i.e. the following
example is not supported right now but we are working on adding support for the same (tracked in issue
[#2557](https://github.com/tektoncd/pipeline/issues/2557)).

```yaml
spec:
tasks:
- name: count-comments-before
taskRef:
Name: count-comments
- name: add-comment
taskRef:
Name: add-comment
- name: count-comments-after
taskRef:
Name: count-comments
finally:
- name: check-count
taskRef:
Name: check-count
params:
- name: before-count
value: $(tasks.count-comments-before.results.count) #invalid
- name: after-count
value: $(tasks.count-comments-after.results.count) #invalid
```

#### Cannot configure `Pipeline` result with `finally`

Final tasks can emit `Results` but results emitted from the final tasks can not be configured in the
Expand Down
19 changes: 0 additions & 19 deletions pkg/apis/pipeline/v1beta1/pipeline_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,32 +472,13 @@ func validateFinalTasks(finalTasks []PipelineTask) *apis.FieldError {
}
}

if err := validateTaskResultReferenceNotUsed(finalTasks); err != nil {
return err
}

if err := validateTasksInputFrom(finalTasks); err != nil {
return err
}

return nil
}

func validateTaskResultReferenceNotUsed(tasks []PipelineTask) *apis.FieldError {
for _, t := range tasks {
for _, p := range t.Params {
expressions, ok := GetVarSubstitutionExpressionsForParam(p)
if ok {
if LooksLikeContainsResultRefs(expressions) {
return apis.ErrInvalidValue(fmt.Sprintf("no task result allowed under params,"+
"final task param %s has set task result as its value", p.Name), "spec.finally.task.params")
}
}
}
}
return nil
}

func validateTasksInputFrom(tasks []PipelineTask) *apis.FieldError {
for _, t := range tasks {
inputResources := []PipelineTaskInputResource{}
Expand Down
9 changes: 0 additions & 9 deletions pkg/apis/pipeline/v1beta1/pipeline_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1524,15 +1524,6 @@ func TestValidateFinalTasks_Failure(t *testing.T) {
}},
},
}},
}, {
name: "invalid pipeline with final tasks having reference to task results",
finalTasks: []PipelineTask{{
Name: "final-task",
TaskRef: &TaskRef{Name: "final-task"},
Params: []Param{{
Name: "param1", Value: ArrayOrString{Type: ParamTypeString, StringVal: "$(tasks.a-task.results.output)"},
}},
}},
}, {
name: "invalid pipeline with final task specifying when expressions",
finalTasks: []PipelineTask{{
Expand Down
21 changes: 18 additions & 3 deletions pkg/reconciler/pipeline/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,33 @@ func (g *Graph) addPipelineTask(t Task) (*Node, error) {
return newNode, nil
}

// Build returns a valid pipeline Graph. Returns error if the pipeline is invalid
func Build(tasks Tasks) (*Graph, error) {
// BuildWithoutLinks builds a Pipeline Graph with the specified tasks as nodes (without any links).
// Returns error if the pipeline is invalid
func BuildWithoutLinks(tasks Tasks) (*Graph, error) {
d := newGraph()

deps := map[string][]string{}
// Add all Tasks mentioned in the `PipelineSpec`
for _, pt := range tasks.Items() {
if _, err := d.addPipelineTask(pt); err != nil {
return nil, fmt.Errorf("task %s is already present in Graph, can't add it again: %w", pt.HashKey(), err)
}
}
return d, nil
}

// Build returns a valid pipeline Graph with all the dependencies converted into appropriate links.
// Returns error if the pipeline is invalid
func Build(tasks Tasks) (*Graph, error) {
d, err := BuildWithoutLinks(tasks)
if err != nil {
return nil, err
}

deps := map[string][]string{}
for _, pt := range tasks.Items() {
deps[pt.HashKey()] = pt.Deps()
}

// Process all from and runAfter constraints to add task dependency
for pt, taskDeps := range deps {
for _, previousTask := range taskDeps {
Expand Down
19 changes: 16 additions & 3 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
// if a task in PipelineRunState is final task or not
// the finally section is optional and might not exist
// dfinally holds an empty Graph in the absence of finally clause
dfinally, err := dag.Build(v1beta1.PipelineTaskList(pipelineSpec.Finally))
dfinally, err := dag.BuildWithoutLinks(v1beta1.PipelineTaskList(pipelineSpec.Finally))
if err != nil {
// This Run has failed, so we need to mark it as failed and stop reconciling it
pr.Status.MarkFailed(ReasonInvalidGraph,
Expand Down Expand Up @@ -495,7 +495,7 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
logger := logging.FromContext(ctx)
recorder := controller.GetEventRecorder(ctx)

var nextRprts []*resources.ResolvedPipelineRunTask
var nextRprts resources.PipelineRunState

// when pipeline run is stopping, do not schedule any new task and only
// wait for all running tasks to complete and report their status
Expand All @@ -521,7 +521,20 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
resources.ApplyTaskResults(nextRprts, resolvedResultRefs)

// GetFinalTasks only returns tasks when a DAG is complete
nextRprts = append(nextRprts, pipelineRunState.GetFinalTasks(d, dfinally)...)
finallyRprts := pipelineRunState.GetFinalTasks(d, dfinally)

// Before creating TaskRun for scheduled final task, check if it's consuming a task result
// Resolve and apply task result wherever applicable, report failure in case resolution fails
for _, rprt := range finallyRprts {
resolvedResultRefs, err := resources.ResolveResultRefs(pipelineRunState, resources.PipelineRunState{rprt})
if err != nil {
rprt.InvalidTaskResultsInFinally = true
logger.Warnf("Declaring final task %q failed as it failed to resolve task params for %q with error %v", rprt.PipelineTask.Name, pr.Name, err)
continue
}
resources.ApplyTaskResults(resources.PipelineRunState{rprt}, resolvedResultRefs)
nextRprts = append(nextRprts, rprt)
}

for _, rprt := range nextRprts {
if rprt == nil || rprt.Skip(pipelineRunState, d) {
Expand Down
126 changes: 126 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3923,6 +3923,132 @@ func TestReconcilePipeline_TaskSpecMetadata(t *testing.T) {
}
}

func TestReconcileWithTaskResultsInFinalTasks(t *testing.T) {
names.TestingSeed()
ps := []*v1beta1.Pipeline{tb.Pipeline("test-pipeline", tb.PipelineNamespace("foo"), tb.PipelineSpec(
tb.PipelineTask("dag-task-1", "dag-task"),
tb.PipelineTask("dag-task-2", "dag-task"),
tb.FinalPipelineTask("final-task-1", "final-task-1",
tb.PipelineTaskParam("finalParam", "$(tasks.dag-task-1.results.aResult)"),
),
tb.FinalPipelineTask("final-task-2", "final-task-2",
tb.PipelineTaskParam("finalParam", "$(tasks.dag-task-2.results.aResult)"),
),
))}
prs := []*v1beta1.PipelineRun{tb.PipelineRun("test-pipeline-run-final-task-results", tb.PipelineRunNamespace("foo"),
tb.PipelineRunSpec("test-pipeline",
tb.PipelineRunServiceAccountName("test-sa-0"),
),
)}
ts := []*v1beta1.Task{
tb.Task("dag-task", tb.TaskNamespace("foo")),
tb.Task("final-task-1", tb.TaskNamespace("foo"),
tb.TaskSpec(
tb.TaskParam("finalParam", v1beta1.ParamTypeString),
),
),
tb.Task("final-task-2", tb.TaskNamespace("foo"),
tb.TaskSpec(
tb.TaskParam("finalParam", v1beta1.ParamTypeString),
),
),
}
trs := []*v1beta1.TaskRun{
tb.TaskRun("test-pipeline-run-final-task-results-dag-task-1-xxyyy",
tb.TaskRunNamespace("foo"),
tb.TaskRunOwnerReference("PipelineRun", "test-pipeline-run-final-task-results",
tb.OwnerReferenceAPIVersion("tekton.dev/v1beta1"),
tb.Controller, tb.BlockOwnerDeletion,
),
tb.TaskRunLabel("tekton.dev/pipeline", "test-pipeline"),
tb.TaskRunLabel("tekton.dev/pipelineRun", "test-pipeline-run-final-task-results"),
tb.TaskRunLabel("tekton.dev/pipelineTask", "dag-task-1"),
tb.TaskRunSpec(
tb.TaskRunTaskRef("hello-world"),
tb.TaskRunServiceAccountName("test-sa"),
),
tb.TaskRunStatus(
tb.StatusCondition(
apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionTrue,
},
),
tb.TaskRunResult("aResult", "aResultValue"),
),
),
tb.TaskRun("test-pipeline-run-final-task-results-dag-task-2-xxyyy",
tb.TaskRunNamespace("foo"),
tb.TaskRunOwnerReference("PipelineRun", "test-pipeline-run-final-task-results",
tb.OwnerReferenceAPIVersion("tekton.dev/v1beta1"),
tb.Controller, tb.BlockOwnerDeletion,
),
tb.TaskRunLabel("tekton.dev/pipeline", "test-pipeline"),
tb.TaskRunLabel("tekton.dev/pipelineRun", "test-pipeline-run-final-task-results"),
tb.TaskRunLabel("tekton.dev/pipelineTask", "dag-task-2"),
tb.TaskRunSpec(
tb.TaskRunTaskRef("hello-world"),
tb.TaskRunServiceAccountName("test-sa"),
),
tb.TaskRunStatus(
tb.StatusCondition(
apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
},
),
),
),
}
d := test.Data{
PipelineRuns: prs,
Pipelines: ps,
Tasks: ts,
TaskRuns: trs,
}
prt := NewPipelineRunTest(d, t)
defer prt.Cancel()

reconciledRun, clients := prt.reconcileRun("foo", "test-pipeline-run-final-task-results", []string{}, false)

expectedTaskRunName := "test-pipeline-run-final-task-results-final-task-1-9l9zj"
expectedTaskRun := tb.TaskRun(expectedTaskRunName,
tb.TaskRunNamespace("foo"),
tb.TaskRunOwnerReference("PipelineRun", "test-pipeline-run-final-task-results",
tb.OwnerReferenceAPIVersion("tekton.dev/v1beta1"),
tb.Controller, tb.BlockOwnerDeletion,
),
tb.TaskRunLabel("tekton.dev/pipeline", "test-pipeline"),
tb.TaskRunLabel("tekton.dev/pipelineRun", "test-pipeline-run-final-task-results"),
tb.TaskRunLabel("tekton.dev/pipelineTask", "final-task-1"),
tb.TaskRunSpec(
tb.TaskRunTaskRef("final-task-1"),
tb.TaskRunServiceAccountName("test-sa-0"),
tb.TaskRunParam("finalParam", "aResultValue"),
),
)
// Check that the expected TaskRun was created
actual, err := clients.Pipeline.TektonV1beta1().TaskRuns("foo").List(metav1.ListOptions{
LabelSelector: "tekton.dev/pipelineTask=final-task-1,tekton.dev/pipelineRun=test-pipeline-run-final-task-results",
Limit: 1,
})

if err != nil {
t.Fatalf("Failure to list TaskRun's %s", err)
}
if len(actual.Items) != 1 {
t.Fatalf("Expected 1 TaskRuns got %d", len(actual.Items))
}
actualTaskRun := actual.Items[0]
if d := cmp.Diff(&actualTaskRun, expectedTaskRun, ignoreResourceVersion); d != "" {
t.Errorf("expected to see TaskRun %v created. Diff %s", expectedTaskRunName, diff.PrintWantGot(d))
}

if s := reconciledRun.Status.TaskRuns["test-pipeline-run-final-task-results-final-task-2-mz4c7"].Status.GetCondition(apis.ConditionSucceeded); s.Status != corev1.ConditionFalse {
t.Fatalf("Status expected to be %s but is %s", corev1.ConditionFalse, s.Status)
}
}

// NewPipelineRunTest returns PipelineRunTest with a new PipelineRun controller created with specified state through data
// This PipelineRunTest can be reused for multiple PipelineRuns by calling reconcileRun for each pipelineRun
func NewPipelineRunTest(data test.Data, t *testing.T) *PipelineRunTest {
Expand Down
3 changes: 3 additions & 0 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ type ResolvedPipelineRunTask struct {
ResolvedTaskResources *resources.ResolvedTaskResources
// ConditionChecks ~~TaskRuns but for evaling conditions
ResolvedConditionChecks TaskConditionCheckState // Could also be a TaskRun or maybe just a Pod?
// this flag is set for a task when task result resolution fails for that task
// and mainly used to create a task run object with failure for that task
InvalidTaskResultsInFinally bool
}

func (t ResolvedPipelineRunTask) IsDone() bool {
Expand Down
20 changes: 19 additions & 1 deletion pkg/reconciler/pipelinerun/resources/pipelinerunstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ import (
"knative.dev/pkg/apis"
)

const (
// ReasonInvalidTaskResultsInFinallyTask indicates that the final task was not able to resolve
// task result of a DAG task
ReasonInvalidTaskResultsInFinallyTask = "InvalidTaskResultsInFinallyTask"
)

// PipelineRunState is a slice of ResolvedPipelineRunTasks the represents the current execution
// state of the PipelineRun.
type PipelineRunState []*ResolvedPipelineRunTask
Expand Down Expand Up @@ -264,7 +270,7 @@ func (state PipelineRunState) GetSkippedTasks(pr *v1beta1.PipelineRun, d *dag.Gr
func (state PipelineRunState) GetTaskRunsStatus(pr *v1beta1.PipelineRun) map[string]*v1beta1.PipelineRunTaskRunStatus {
status := make(map[string]*v1beta1.PipelineRunTaskRunStatus)
for _, rprt := range state {
if rprt.TaskRun == nil && rprt.ResolvedConditionChecks == nil {
if rprt.TaskRun == nil && rprt.ResolvedConditionChecks == nil && !rprt.InvalidTaskResultsInFinally {
continue
}

Expand Down Expand Up @@ -305,6 +311,18 @@ func (state PipelineRunState) GetTaskRunsStatus(pr *v1beta1.PipelineRun) map[str
})
}
}
// Maintain a TaskRun Object in pr.Status for a finally task which could not resolve task results
if rprt.InvalidTaskResultsInFinally {
if prtrs.Status == nil {
prtrs.Status = &v1beta1.TaskRunStatus{}
}
prtrs.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: ReasonInvalidTaskResultsInFinallyTask,
Message: fmt.Sprintf("Finally Task %s in PipelineRun %s was having invalid task results", rprt.PipelineTask.Name, pr.Name),
})
}
status[rprt.TaskRunName] = prtrs
}
return status
Expand Down
Loading

0 comments on commit 83e8c63

Please sign in to comment.