Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistent node execution order by sorting node with Sequentialrunner #1604

Merged
2 changes: 2 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
* Reduced number of log lines by changing the logging level from `INFO` to `DEBUG` for low priority messages.
* Kedro's framework-side logging configuration no longer performs file-based logging. Hence superfluous `info.log`/`errors.log` files are no longer created in your project root, and running Kedro on read-only file systems such as Databricks Repos is now possible.
* The `root` logger is now set to the Python default level of `WARNING` rather than `INFO`. Kedro's logger is still set to emit `INFO` level messages.
* Kedro pipeline will have consistent execution order given the same set of nodes when using with `SequentialRunner`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Kedro pipeline will have consistent execution order given the same set of nodes when using with `SequentialRunner`.
* `SequentialRunner` now consistently runs nodes in the same order across multiple runs.

I still don't think this is a very clear explanation though. Maybe what you have is better 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe @MerelTheisenQB has a better idea.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if this is any better to be honest 😅 "Added sorting of nodes for the SequentialRunner to facilitate consistent execution order across multiple runs. "

* Bumped the upper bound for the Flake8 dependency to <5.0.
* `kedro jupyter notebook/lab` no longer reuses a Jupyter kernel.
* Required `cookiecutter>=2.1.1` to address a [known command injection vulnerability](https://security.snyk.io/vuln/SNYK-PYTHON-COOKIECUTTER-2414281).


## Upcoming deprecations for Kedro 0.19.0
* `kedro.extras.ColorHandler` will be removed in 0.19.0.

Expand Down
8 changes: 5 additions & 3 deletions kedro/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def nodes(self) -> List[Node]:
return list(chain.from_iterable(self._topo_sorted_nodes))

@property
def grouped_nodes(self) -> List[Set[Node]]:
def grouped_nodes(self) -> List[List[Node]]:
"""Return a list of the pipeline nodes in topologically ordered groups,
i.e. if node A needs to be run before node B, it will appear in an
earlier group.
Expand Down Expand Up @@ -870,7 +870,7 @@ def _validate_transcoded_inputs_outputs(nodes: List[Node]) -> None:
)


def _topologically_sorted(node_dependencies) -> List[Set[Node]]:
def _topologically_sorted(node_dependencies) -> List[List[Node]]:
"""Topologically group and sort (order) nodes such that no node depends on
a node that appears in the same or a later group.
Expand All @@ -894,7 +894,9 @@ def _circle_error_message(error_data: Dict[str, str]) -> str:
return f"Circular dependencies exist among these items: {circular}"

try:
return list(toposort(node_dependencies))
# Sort it so it has consistent order when run with SequentialRunner
result = [sorted(dependencies) for dependencies in toposort(node_dependencies)]
return result
except ToposortCircleError as exc:
message = _circle_error_message(exc.data)
raise CircularDependencyError(message) from exc
Expand Down
5 changes: 3 additions & 2 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,9 @@ def test_grouped_nodes(self, input_data):
grouped = pipeline.grouped_nodes
# Flatten a list of grouped nodes
assert pipeline.nodes == list(chain.from_iterable(grouped))
# Check each grouped node matches with expected group
assert all(g == e for g, e in zip(grouped, expected))
# Check each grouped node matches with the expected group, the order is
# non-deterministic, so we are only checking they have the same set of nodes.
assert all(set(g) == e for g, e in zip(grouped, expected))

def test_free_input(self, input_data):
nodes = input_data["nodes"]
Expand Down
2 changes: 1 addition & 1 deletion tests/pipeline/test_pipeline_with_transcoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def test_grouped_nodes(self, input_data):
# Flatten a list of grouped nodes
assert pipeline.nodes == list(chain.from_iterable(grouped))
# Check each grouped node matches with expected group
assert all(g == e for g, e in zip(grouped, expected))
assert all(set(g) == e for g, e in zip(grouped, expected))

def test_free_input(self, input_data):
nodes = input_data["nodes"]
Expand Down