Skip to content

Commit

Permalink
fix cycle detection logic
Browse files Browse the repository at this point in the history
visited map holds name of the nodes been visited as keys and "true"
as value. This map is being updated with currentName.HashKey on every
iteration which results in keys such as a.d, b.d where a, b, and d
are nodes of a dag. But such keys are not utilized anywhere in the
visit function. Visit function checks existence of the node just
by the name without any string concatenation.

This extra addition in the map is causing severe delay for a graph
with more than >60 nodes.
  • Loading branch information
pritidesai committed Nov 24, 2020
1 parent 2637b10 commit 0b4864b
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 12 deletions.
14 changes: 4 additions & 10 deletions pkg/reconciler/pipeline/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,28 +126,22 @@ func linkPipelineTasks(prev *Node, next *Node) error {
return fmt.Errorf("cycle detected; task %q depends on itself", next.Task.HashKey())
}
// Check if we are adding cycles.
visited := map[string]bool{prev.Task.HashKey(): true, next.Task.HashKey(): true}
path := []string{next.Task.HashKey(), prev.Task.HashKey()}
if err := visit(next.Task.HashKey(), prev.Prev, path, visited); err != nil {
if err := lookForNode(prev.Prev, path, next.Task.HashKey()); err != nil {
return fmt.Errorf("cycle detected: %w", err)
}
next.Prev = append(next.Prev, prev)
prev.Next = append(prev.Next, next)
return nil
}

func visit(currentName string, nodes []*Node, path []string, visited map[string]bool) error {
var sb strings.Builder
func lookForNode(nodes []*Node, path []string, next string) error {
for _, n := range nodes {
path = append(path, n.Task.HashKey())
if _, ok := visited[n.Task.HashKey()]; ok {
if n.Task.HashKey() == next {
return errors.New(getVisitedPath(path))
}
sb.WriteString(currentName)
sb.WriteByte('.')
sb.WriteString(n.Task.HashKey())
visited[sb.String()] = true
if err := visit(n.Task.HashKey(), n.Prev, path, visited); err != nil {
if err := lookForNode(n.Prev, path, next); err != nil {
return err
}
}
Expand Down
83 changes: 81 additions & 2 deletions pkg/reconciler/pipeline/dag/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package dag_test

import (
"strings"
"testing"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -479,7 +480,7 @@ func TestBuild_ConditionsParamsFromTaskResults(t *testing.T) {
assertSameDAG(t, expectedDAG, g)
}

func TestBuild_Invalid(t *testing.T) {
func TestBuild_InvalidDAG(t *testing.T) {
a := v1beta1.PipelineTask{Name: "a"}
xDependsOnA := v1beta1.PipelineTask{
Name: "x",
Expand Down Expand Up @@ -539,37 +540,114 @@ func TestBuild_Invalid(t *testing.T) {
Resources: []v1beta1.PipelineTaskInputResource{{From: []string{"none"}}},
}},
}
aRunsAfterE := v1beta1.PipelineTask{Name: "a", RunAfter: []string{"e"}}
bDependsOnA := v1beta1.PipelineTask{
Name: "b",
Resources: &v1beta1.PipelineTaskResources{
Inputs: []v1beta1.PipelineTaskInputResource{{From: []string{"a"}}},
},
}
cRunsAfterA := v1beta1.PipelineTask{
Name: "c",
RunAfter: []string{"a"},
}
dDependsOnBAndC := v1beta1.PipelineTask{
Name: "d",
Resources: &v1beta1.PipelineTaskResources{
Inputs: []v1beta1.PipelineTaskInputResource{{From: []string{"b", "c"}}},
},
}
eRunsAfterD := v1beta1.PipelineTask{
Name: "e",
RunAfter: []string{"d"},
}
fRunsAfterD := v1beta1.PipelineTask{
Name: "f",
RunAfter: []string{"d"},
}
gDependsOnF := v1beta1.PipelineTask{
Name: "g",
Resources: &v1beta1.PipelineTaskResources{
Inputs: []v1beta1.PipelineTaskInputResource{{From: []string{"f"}}},
},
}

tcs := []struct {
name string
spec v1beta1.PipelineSpec
err string
}{{
// a
// |
// a ("a" depends on resource from "a")
name: "self-link-from",
spec: v1beta1.PipelineSpec{Tasks: []v1beta1.PipelineTask{selfLinkFrom}},
err: "cycle detected",
}, {
// a
// |
// a ("a" runAfter "a")
name: "self-link-after",
spec: v1beta1.PipelineSpec{Tasks: []v1beta1.PipelineTask{selfLinkAfter}},
err: "cycle detected",
}, {
// a (also "a" depends on resource from "z")
// |
// x ("x" depends on resource from "a")
// |
// z ("z" depends on resource from "x")
name: "cycle-from",
spec: v1beta1.PipelineSpec{Tasks: []v1beta1.PipelineTask{xDependsOnA, zDependsOnX, aDependsOnZ}},
err: "cycle detected",
}, {
// a (also "a" runAfter "z")
// |
// x ("x" runAfter "a")
// |
// z ("z" runAfter "x")
name: "cycle-runAfter",
spec: v1beta1.PipelineSpec{Tasks: []v1beta1.PipelineTask{xAfterA, zAfterX, aAfterZ}},
err: "cycle detected",
}, {
// a (also "a" depends on resource from "z")
// |
// x ("x" depends on resource from "a")
// |
// z ("z" runAfter "x")
name: "cycle-both",
spec: v1beta1.PipelineSpec{Tasks: []v1beta1.PipelineTask{xDependsOnA, zAfterX, aDependsOnZ}},
err: "cycle detected",
}, {
// This test make sure we detect a cyclic branch in a DAG with multiple branches.
// The following DAG is having a cyclic branch with an additional dependency (a runAfter e)
// a
// / \
// b c
// \ /
// d
// / \
// e f
// |
// g
name: "multiple-branches-with-one-cyclic-branch",
spec: v1beta1.PipelineSpec{Tasks: []v1beta1.PipelineTask{aRunsAfterE, bDependsOnA, cRunsAfterA, dDependsOnBAndC, eRunsAfterD, fRunsAfterD, gDependsOnF}},
err: "cycle detected",
}, {
name: "duplicate-tasks",
spec: v1beta1.PipelineSpec{Tasks: []v1beta1.PipelineTask{a, a}},
err: "duplicate pipeline task",
}, {
name: "invalid-task-name-from",
spec: v1beta1.PipelineSpec{Tasks: []v1beta1.PipelineTask{invalidTaskFrom}},
err: "wasn't present in Pipeline",
}, {
name: "invalid-task-name-after",
spec: v1beta1.PipelineSpec{Tasks: []v1beta1.PipelineTask{invalidTaskAfter}},
err: "wasn't present in Pipeline",
}, {
name: "invalid-task-name-from-conditional",
spec: v1beta1.PipelineSpec{Tasks: []v1beta1.PipelineTask{invalidConditionalTask}},
err: "wasn't present in Pipeline",
},
}
for _, tc := range tcs {
Expand All @@ -578,7 +656,8 @@ func TestBuild_Invalid(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Name: tc.name},
Spec: tc.spec,
}
if _, err := dag.Build(v1beta1.PipelineTaskList(p.Spec.Tasks)); err == nil {
_, err := dag.Build(v1beta1.PipelineTaskList(p.Spec.Tasks))
if err == nil || !strings.Contains(err.Error(), tc.err) {
t.Errorf("expected to see an error for invalid DAG in pipeline %v but had none", tc.spec)
}
})
Expand Down

0 comments on commit 0b4864b

Please sign in to comment.