Skip to content

Commit

Permalink
Consistent node execution order by sorting node with `Sequentialrunne…
Browse files Browse the repository at this point in the history
…r` (#1604)

* Consistent node execution order by sorting node

Signed-off-by: Nok Chan <nok.lam.chan@quantumblack.com>

* Adding the unittest

Signed-off-by: Nok Chan <nok.lam.chan@quantumblack.com>

* fix test and linting

Signed-off-by: Nok Chan <nok.lam.chan@quantumblack.com>

* Fix unittests

Signed-off-by: Nok Chan <nok.lam.chan@quantumblack.com>

* Update README

Signed-off-by: Nok Chan <nok.lam.chan@quantumblack.com>

* remove test

Signed-off-by: Nok Chan <nok.lam.chan@quantumblack.com>

* Minor refactoring with list comprehension

Signed-off-by: Nok Chan <nok.lam.chan@quantumblack.com>

* Fix Linting

Signed-off-by: Nok Chan <nok.lam.chan@quantumblack.com>

* minor update of release notes

Signed-off-by: Nok Chan <nok.lam.chan@quantumblack.com>
  • Loading branch information
noklam authored Jun 16, 2022
1 parent 4710b6c commit 6768d7f
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 6 deletions.
2 changes: 2 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
* Reduced number of log lines by changing the logging level from `INFO` to `DEBUG` for low priority messages.
* Kedro's framework-side logging configuration no longer performs file-based logging. Hence superfluous `info.log`/`errors.log` files are no longer created in your project root, and running Kedro on read-only file systems such as Databricks Repos is now possible.
* The `root` logger is now set to the Python default level of `WARNING` rather than `INFO`. Kedro's logger is still set to emit `INFO` level messages.
* `SequentialRunner` now has consistent execution order across multiple runs with sorted nodes.
* Bumped the upper bound for the Flake8 dependency to <5.0.
* `kedro jupyter notebook/lab` no longer reuses a Jupyter kernel.
* Required `cookiecutter>=2.1.1` to address a [known command injection vulnerability](https://security.snyk.io/vuln/SNYK-PYTHON-COOKIECUTTER-2414281).


## Upcoming deprecations for Kedro 0.19.0
* `kedro.extras.ColorHandler` will be removed in 0.19.0.

Expand Down
8 changes: 5 additions & 3 deletions kedro/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def nodes(self) -> List[Node]:
return list(chain.from_iterable(self._topo_sorted_nodes))

@property
def grouped_nodes(self) -> List[Set[Node]]:
def grouped_nodes(self) -> List[List[Node]]:
"""Return a list of the pipeline nodes in topologically ordered groups,
i.e. if node A needs to be run before node B, it will appear in an
earlier group.
Expand Down Expand Up @@ -870,7 +870,7 @@ def _validate_transcoded_inputs_outputs(nodes: List[Node]) -> None:
)


def _topologically_sorted(node_dependencies) -> List[Set[Node]]:
def _topologically_sorted(node_dependencies) -> List[List[Node]]:
"""Topologically group and sort (order) nodes such that no node depends on
a node that appears in the same or a later group.
Expand All @@ -894,7 +894,9 @@ def _circle_error_message(error_data: Dict[str, str]) -> str:
return f"Circular dependencies exist among these items: {circular}"

try:
return list(toposort(node_dependencies))
# Sort it so it has consistent order when run with SequentialRunner
result = [sorted(dependencies) for dependencies in toposort(node_dependencies)]
return result
except ToposortCircleError as exc:
message = _circle_error_message(exc.data)
raise CircularDependencyError(message) from exc
Expand Down
5 changes: 3 additions & 2 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,9 @@ def test_grouped_nodes(self, input_data):
grouped = pipeline.grouped_nodes
# Flatten a list of grouped nodes
assert pipeline.nodes == list(chain.from_iterable(grouped))
# Check each grouped node matches with expected group
assert all(g == e for g, e in zip(grouped, expected))
# Check each grouped node matches with the expected group, the order is
# non-deterministic, so we are only checking they have the same set of nodes.
assert all(set(g) == e for g, e in zip(grouped, expected))

def test_free_input(self, input_data):
nodes = input_data["nodes"]
Expand Down
2 changes: 1 addition & 1 deletion tests/pipeline/test_pipeline_with_transcoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def test_grouped_nodes(self, input_data):
# Flatten a list of grouped nodes
assert pipeline.nodes == list(chain.from_iterable(grouped))
# Check each grouped node matches with expected group
assert all(g == e for g, e in zip(grouped, expected))
assert all(set(g) == e for g, e in zip(grouped, expected))

def test_free_input(self, input_data):
nodes = input_data["nodes"]
Expand Down

0 comments on commit 6768d7f

Please sign in to comment.