Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix recursion issue on Skip #3524

Merged
merged 2 commits into from
Nov 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion pkg/apis/pipeline/v1beta1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package v1beta1
import (
"github.com/tektoncd/pipeline/pkg/reconciler/pipeline/dag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
)

// +genclient
Expand Down Expand Up @@ -174,7 +175,16 @@ func (pt PipelineTask) Deps() []string {
deps = append(deps, pt.resourceDeps()...)
deps = append(deps, pt.orderingDeps()...)

return deps
uniqueDeps := sets.NewString()
for _, w := range deps {
if uniqueDeps.Has(w) {
continue
}
uniqueDeps.Insert(w)

}

return uniqueDeps.List()
Copy link
Member

@pritidesai pritidesai Nov 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this significantly reduces the call to dag.visit and makes it possible for the unit test in this PR to complete on average under five seconds. The mysterious dot notation in dag.visit is still unsolved though:

visited[currentName+"."+n.Task.HashKey()] = true

Its creating a link between two nodes which are indirectly connected (C.A in A -> B -> C) but where/how is this data being utilized is a mystery.

}

func (pt PipelineTask) resourceDeps() []string {
Expand Down
6 changes: 5 additions & 1 deletion pkg/reconciler/pipeline/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,16 @@ func linkPipelineTasks(prev *Node, next *Node) error {
}

func visit(currentName string, nodes []*Node, path []string, visited map[string]bool) error {
var sb strings.Builder
for _, n := range nodes {
path = append(path, n.Task.HashKey())
if _, ok := visited[n.Task.HashKey()]; ok {
return errors.New(getVisitedPath(path))
}
visited[currentName+"."+n.Task.HashKey()] = true
sb.WriteString(currentName)
sb.WriteByte('.')
sb.WriteString(n.Task.HashKey())
visited[sb.String()] = true
if err := visit(n.Task.HashKey(), n.Prev, path, visited); err != nil {
return err
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,9 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get
return err
}

// Reset the skipped status to trigger recalculation
pipelineRunFacts.ResetSkippedCache()

after := pipelineRunFacts.GetPipelineConditionStatus(pr, logger)
switch after.Status {
case corev1.ConditionTrue:
Expand Down Expand Up @@ -565,6 +568,9 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
}

resources.ApplyTaskResults(nextRprts, resolvedResultRefs)
// After we apply Task Results, we may be able to evaluate more
// when expressions, so reset the skipped cache
pipelineRunFacts.ResetSkippedCache()

// GetFinalTasks only returns tasks when a DAG is complete
nextRprts = append(nextRprts, pipelineRunFacts.GetFinalTasks()...)
Expand Down
24 changes: 17 additions & 7 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,7 @@ func (t *ResolvedPipelineRunTask) checkParentsDone(facts *PipelineRunFacts) bool
return true
}

// Skip returns true if a PipelineTask will not be run because
// (1) its When Expressions evaluated to false
// (2) its Condition Checks failed
// (3) its parent task was skipped
// (4) Pipeline is in stopping state (one of the PipelineTasks failed)
// Note that this means Skip returns false if a conditionCheck is in progress
func (t *ResolvedPipelineRunTask) Skip(facts *PipelineRunFacts) bool {
func (t *ResolvedPipelineRunTask) skip(facts *PipelineRunFacts) bool {
if facts.isFinalTask(t.PipelineTask.Name) || t.IsStarted() {
return false
}
Expand All @@ -151,6 +145,22 @@ func (t *ResolvedPipelineRunTask) Skip(facts *PipelineRunFacts) bool {
return false
}

// Skip returns true if a PipelineTask will not be run because
// (1) its When Expressions evaluated to false
// (2) its Condition Checks failed
// (3) its parent task was skipped
// (4) Pipeline is in stopping state (one of the PipelineTasks failed)
// Note that this means Skip returns false if a conditionCheck is in progress
func (t *ResolvedPipelineRunTask) Skip(facts *PipelineRunFacts) bool {
if facts.SkipCache == nil {
facts.SkipCache = make(map[string]bool)
}
if _, cached := facts.SkipCache[t.PipelineTask.Name]; !cached {
facts.SkipCache[t.PipelineTask.Name] = t.skip(facts) // t.skip() is same as our existing t.Skip()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest removing the comment now that the original t.Skip is renamed t.skip

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}
return facts.SkipCache[t.PipelineTask.Name]
}

func (t *ResolvedPipelineRunTask) conditionsSkip() bool {
if len(t.ResolvedConditionChecks) > 0 {
if t.ResolvedConditionChecks.IsDone() && !t.ResolvedConditionChecks.IsSuccess() {
Expand Down
15 changes: 15 additions & 0 deletions pkg/reconciler/pipelinerun/resources/pipelinerunstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ type PipelineRunFacts struct {
State PipelineRunState
TasksGraph *dag.Graph
FinalTasksGraph *dag.Graph

// SkipCache is a hash of PipelineTask names that stores whether a task will be
// executed or not, because it's either not reachable via the DAG due to the pipeline
// state, or because it has failed conditions.
// We cache this data along the state, because it's expensive to compute, it requires
// traversing potentially the whole graph; this way it can built incrementally, when
// needed, via the `Skip` method in pipelinerunresolution.go
// The skip data is sensitive to changes in the state. The ResetSkippedCache method
// can be used to clean the cache and force re-computation when needed.
SkipCache map[string]bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might be able to instantiate this cache based on pr.Status.SkippedTasks which holds the list of skippedTasks.

I will not block this PR for skippedTasks, we can revisit this in next iteration. @jerop?

}

// pipelineRunStatusCount holds the count of successful, failed, cancelled, skipped, and incomplete tasks
Expand All @@ -53,6 +63,11 @@ type pipelineRunStatusCount struct {
Incomplete int
}

// ResetSkippedCache resets the skipped cache in the facts map
func (facts *PipelineRunFacts) ResetSkippedCache() {
facts.SkipCache = make(map[string]bool)
}

// ToMap returns a map that maps pipeline task name to the resolved pipeline run task
func (state PipelineRunState) ToMap() map[string]*ResolvedPipelineRunTask {
m := make(map[string]*ResolvedPipelineRunTask)
Expand Down
65 changes: 65 additions & 0 deletions pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package resources

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -408,6 +409,7 @@ func TestGetNextTaskWithRetries(t *testing.T) {
}

func TestPipelineRunState_SuccessfulOrSkippedDAGTasks(t *testing.T) {
largePipelineState := buildPipelineStateWithLargeDepencyGraph(t)
tcs := []struct {
name string
state PipelineRunState
Expand Down Expand Up @@ -454,6 +456,10 @@ func TestPipelineRunState_SuccessfulOrSkippedDAGTasks(t *testing.T) {
"not skipped since it failed",
state: conditionCheckFailedWithOthersFailedState,
expectedNames: []string{pts[5].Name},
}, {
name: "large deps, not started",
state: largePipelineState,
expectedNames: []string{},
}}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
Expand All @@ -474,6 +480,65 @@ func TestPipelineRunState_SuccessfulOrSkippedDAGTasks(t *testing.T) {
}
}

func buildPipelineStateWithLargeDepencyGraph(t *testing.T) PipelineRunState {
t.Helper()
var task = &v1beta1.Task{
ObjectMeta: metav1.ObjectMeta{
Name: "task",
},
Spec: v1beta1.TaskSpec{
Steps: []v1beta1.Step{{Container: corev1.Container{
Name: "step1",
}}},
},
}
var pipelineRunState PipelineRunState
pipelineRunState = []*ResolvedPipelineRunTask{{
PipelineTask: &v1beta1.PipelineTask{
Name: "t1",
TaskRef: &v1beta1.TaskRef{Name: "task"},
},
TaskRun: nil,
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
}}
for i := 2; i < 400; i++ {
dependFrom := 1
if i > 10 {
if i%10 == 0 {
dependFrom = i - 10
} else {
dependFrom = i - (i % 10)
}
}
params := []v1beta1.Param{}
var alpha byte
for alpha = 'a'; alpha <= 'j'; alpha++ {
params = append(params, v1beta1.Param{
Name: fmt.Sprintf("%c", alpha),
Value: v1beta1.ArrayOrString{
Type: v1beta1.ParamTypeString,
StringVal: fmt.Sprintf("$(tasks.t%d.results.%c)", dependFrom, alpha),
},
})
}
pipelineRunState = append(pipelineRunState, &ResolvedPipelineRunTask{
PipelineTask: &v1beta1.PipelineTask{
Name: fmt.Sprintf("t%d", i),
Params: params,
TaskRef: &v1beta1.TaskRef{Name: "task"},
},
TaskRun: nil,
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
},
)
}
return pipelineRunState
}

func TestPipelineRunState_GetFinalTasks(t *testing.T) {
tcs := []struct {
name string
Expand Down