From e8f9f09d01cca9a6b9f99006ad482988319bb297 Mon Sep 17 00:00:00 2001 From: Takieddine Kadiri Date: Sat, 17 Oct 2020 17:10:51 +0200 Subject: [PATCH] FIX #66 - Accessing project context inside hooks (and consequently fix #30 #64 #72) --- CHANGELOG.md | 16 ++ kedro_mlflow/framework/cli/cli.py | 14 +- .../framework/context/mlflow_context.py | 19 +-- kedro_mlflow/framework/hooks/node_hook.py | 41 ++++- kedro_mlflow/framework/hooks/pipeline_hook.py | 14 +- tests/conftest.py | 41 ++++- .../framework/context/test_mlflow_context.py | 48 +++++- tests/framework/hooks/test_node_hook.py | 152 +++++++++++++++--- tests/framework/hooks/test_pipeline_hook.py | 5 +- 9 files changed, 296 insertions(+), 54 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e720ed60..d652863b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,22 @@ ## [Unreleased] +### Added + +- + +### Fixed + +- `get_mlflow_config` now uses the kedro context config_loader to get configs (#66). This indirectly solves the following issues: + - `get_mlflow_config` now works in interactive mode if `load_context` is called with a path different from the working directory (#30) + - kedro_mlflow now works fine with kedro jupyter notebook independently of the working directory (#64) + - You can use global variables in `mlflow.yml` which is now properly parsed if you use a `TemplatedConfigLoader` (#72) +- `mlflow init` is now getting conf path from context.CONF_ROOT instead of hardcoded conf folder. This makes the package robust to Kedro changes. + +### Changed + +- `MlflowNodeHook` have now a before_pipeline_run hook which stores the ProjectContext and enable to retrieve configuration. + ## [0.3.0] - 2020-10-11 ### Added diff --git a/kedro_mlflow/framework/cli/cli.py b/kedro_mlflow/framework/cli/cli.py index c58f11a7..2b68f7a7 100644 --- a/kedro_mlflow/framework/cli/cli.py +++ b/kedro_mlflow/framework/cli/cli.py @@ -4,6 +4,7 @@ import click from kedro import __file__ as KEDRO_PATH +from kedro.framework.context import load_context from kedro_mlflow.framework.cli.cli_utils import ( render_jinja_template, @@ -88,6 +89,8 @@ def init(force, silent): # get constants project_path = Path().cwd() project_globals = _get_project_globals() + context = load_context(project_path) + conf_root = context.CONF_ROOT # mlflow.yml is just a static file, # but the name of the experiment is set to be the same as the project @@ -95,12 +98,14 @@ def init(force, silent): write_jinja_template( src=TEMPLATE_FOLDER_PATH / mlflow_yml, is_cookiecutter=False, - dst=project_path / "conf" / "base" / mlflow_yml, + dst=project_path / conf_root / "base" / mlflow_yml, python_package=project_globals["python_package"], ) if not silent: click.secho( - click.style("'conf/base/mlflow.yml' successfully updated.", fg="green") + click.style( + f"'{conf_root}/base/mlflow.yml' successfully updated.", fg="green" + ) ) # make a check whether the project run.py is strictly identical to the template # if yes, replace the script by the template silently @@ -184,8 +189,11 @@ def ui(project_path, env): """ + if not project_path: + project_path = Path().cwd() + context = load_context(project_path=project_path, env=env) # the context must contains the self.mlflow attribues with mlflow configuration - mlflow_conf = get_mlflow_config(project_path=project_path, env=env) + mlflow_conf = get_mlflow_config(context) # call mlflow ui with specific options # TODO : add more options for ui diff --git a/kedro_mlflow/framework/context/mlflow_context.py b/kedro_mlflow/framework/context/mlflow_context.py index 98bf8e93..8ed91bae 100644 --- a/kedro_mlflow/framework/context/mlflow_context.py +++ b/kedro_mlflow/framework/context/mlflow_context.py @@ -1,6 +1,4 @@ -from pathlib import Path - -from kedro.config import ConfigLoader +from kedro.framework.context import KedroContext from kedro_mlflow.framework.context.config import KedroMlflowConfig @@ -8,16 +6,9 @@ # this could be a read-only property in the context # with a @property decorator # but for consistency with hook system, it is an external function -def get_mlflow_config(project_path=None, env="local"): - if project_path is None: - project_path = Path.cwd() - project_path = Path(project_path) - conf_paths = [ - str(project_path / "conf" / "base"), - str(project_path / "conf" / env), - ] - config_loader = ConfigLoader(conf_paths=conf_paths) - conf_mlflow_yml = config_loader.get("mlflow*", "mlflow*/**") - conf_mlflow = KedroMlflowConfig(project_path=project_path) +def get_mlflow_config(context: KedroContext): + + conf_mlflow_yml = context.config_loader.get("mlflow*", "mlflow*/**") + conf_mlflow = KedroMlflowConfig(context.project_path) conf_mlflow.from_dict(conf_mlflow_yml) return conf_mlflow diff --git a/kedro_mlflow/framework/hooks/node_hook.py b/kedro_mlflow/framework/hooks/node_hook.py index 95585df4..5792a634 100644 --- a/kedro_mlflow/framework/hooks/node_hook.py +++ b/kedro_mlflow/framework/hooks/node_hook.py @@ -1,8 +1,10 @@ from typing import Any, Dict import mlflow +from kedro.framework.context import load_context from kedro.framework.hooks import hook_impl from kedro.io import DataCatalog +from kedro.pipeline import Pipeline from kedro.pipeline.node import Node from kedro_mlflow.framework.context import get_mlflow_config @@ -10,7 +12,44 @@ class MlflowNodeHook: def __init__(self): - config = get_mlflow_config() + self.context = None + self.flatten = False + self.recursive = True + self.sep = "." + + @hook_impl + def before_pipeline_run( + self, run_params: Dict[str, Any], pipeline: Pipeline, catalog: DataCatalog + ) -> None: + """Hook to be invoked before a pipeline runs. + Args: + run_params: The params needed for the given run. + Should be identical to the data logged by Journal. + # @fixme: this needs to be modelled explicitly as code, instead of comment + Schema: { + "run_id": str, + "project_path": str, + "env": str, + "kedro_version": str, + "tags": Optional[List[str]], + "from_nodes": Optional[List[str]], + "to_nodes": Optional[List[str]], + "node_names": Optional[List[str]], + "from_inputs": Optional[List[str]], + "load_versions": Optional[List[str]], + "pipeline_name": str, + "extra_params": Optional[Dict[str, Any]], + } + pipeline: The ``Pipeline`` that will be run. + catalog: The ``DataCatalog`` to be used during the run. + """ + + self.context = load_context( + project_path=run_params["project_path"], + env=run_params["env"], + extra_params=run_params["extra_params"], + ) + config = get_mlflow_config(self.context) self.flatten = config.node_hook_opts["flatten_dict_params"] self.recursive = config.node_hook_opts["recursive"] self.sep = config.node_hook_opts["sep"] diff --git a/kedro_mlflow/framework/hooks/pipeline_hook.py b/kedro_mlflow/framework/hooks/pipeline_hook.py index 3073602d..6b754dd1 100644 --- a/kedro_mlflow/framework/hooks/pipeline_hook.py +++ b/kedro_mlflow/framework/hooks/pipeline_hook.py @@ -4,6 +4,7 @@ import mlflow import yaml +from kedro.framework.context import load_context from kedro.framework.hooks import hook_impl from kedro.io import DataCatalog from kedro.pipeline import Pipeline @@ -12,11 +13,14 @@ from kedro_mlflow.framework.context import get_mlflow_config from kedro_mlflow.io import MlflowMetricsDataSet from kedro_mlflow.mlflow import KedroPipelineModel -from kedro_mlflow.pipeline.pipeline_ml_factory import PipelineML +from kedro_mlflow.pipeline.pipeline_ml import PipelineML from kedro_mlflow.utils import _parse_requirements class MlflowPipelineHook: + def __init__(self): + self.context = None + @hook_impl def after_catalog_created( self, @@ -62,9 +66,13 @@ def before_pipeline_run( pipeline: The ``Pipeline`` that will be run. catalog: The ``DataCatalog`` to be used during the run. """ - mlflow_conf = get_mlflow_config( - project_path=run_params["project_path"], env=run_params["env"] + self.context = load_context( + project_path=run_params["project_path"], + env=run_params["env"], + extra_params=run_params["extra_params"], ) + + mlflow_conf = get_mlflow_config(self.context) mlflow.set_tracking_uri(mlflow_conf.mlflow_tracking_uri) # TODO : if the pipeline fails, we need to be able to end stop the mlflow run # cannot figure out how to do this within hooks diff --git a/tests/conftest.py b/tests/conftest.py index d4a6d529..fe76ab98 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,4 @@ +import os from pathlib import Path from typing import Dict @@ -49,7 +50,45 @@ def config_dir(tmp_path): credentials = tmp_path / "conf" / env / "credentials.yml" logging = tmp_path / "conf" / env / "logging.yml" parameters = tmp_path / "conf" / env / "parameters.yml" + globals_yaml = tmp_path / "conf" / env / "globals.yml" + kedro_yaml = tmp_path / ".kedro.yml" _write_yaml(catalog, dict()) _write_yaml(parameters, dict()) + _write_yaml(globals_yaml, dict()) _write_yaml(credentials, dict()) - _write_yaml(logging, _get_local_logging_config()) + _write_yaml(logging, _get_local_logging_config()), + + _write_yaml( + kedro_yaml, + dict( + { + "context_path": "dummy_package.run.ProjectContext", + "project_name": "dummy_package", + "project_version": "0.16.4", + "package_name": "dummy_package", + } + ), + ) + + os.mkdir(tmp_path / "src") + os.mkdir(tmp_path / "src" / "dummy_package") + with open(tmp_path / "src" / "dummy_package" / "run.py", "w") as f: + f.writelines( + [ + "from kedro.framework.context import KedroContext\n", + "from kedro.config import TemplatedConfigLoader \n" + "class ProjectContext(KedroContext):\n", + " project_name = 'dummy_package'\n", + " project_version = '0.16.4'\n", + " package_name = 'dummy_package'\n", + ] + ) + f.writelines( + [ + " def _create_config_loader(self, conf_paths):\n", + " return TemplatedConfigLoader(\n", + " conf_paths,\n", + " globals_pattern='globals.yml'\n", + " )\n", + ] + ) diff --git a/tests/framework/context/test_mlflow_context.py b/tests/framework/context/test_mlflow_context.py index 4cf4cee6..6493f7e0 100644 --- a/tests/framework/context/test_mlflow_context.py +++ b/tests/framework/context/test_mlflow_context.py @@ -1,4 +1,5 @@ import yaml +from kedro.framework.context import load_context from kedro_mlflow.framework.context import get_mlflow_config @@ -7,15 +8,17 @@ # get_mlflow_config(project_path=tmp_path,env="local") +def _write_yaml(filepath, config): + filepath.parent.mkdir(parents=True, exist_ok=True) + yaml_str = yaml.dump(config) + filepath.write_text(yaml_str) + + def test_get_mlflow_config(mocker, tmp_path, config_dir): # config_with_base_mlflow_conf is a pytest.fixture in conftest + mocker.patch("logging.config.dictConfig") mocker.patch("kedro_mlflow.utils._is_kedro_project", return_value=True) - def _write_yaml(filepath, config): - filepath.parent.mkdir(parents=True, exist_ok=True) - yaml_str = yaml.dump(config) - filepath.write_text(yaml_str) - _write_yaml( tmp_path / "conf" / "base" / "mlflow.yml", dict( @@ -35,4 +38,37 @@ def _write_yaml(filepath, config): "node": {"flatten_dict_params": True, "recursive": False, "sep": "-"} }, } - assert get_mlflow_config(project_path=tmp_path, env="local").to_dict() == expected + context = load_context(tmp_path) + assert get_mlflow_config(context).to_dict() == expected + + +def test_mlflow_config_with_templated_config(mocker, tmp_path, config_dir): + + _write_yaml( + tmp_path / "conf" / "base" / "mlflow.yml", + dict( + mlflow_tracking_uri="${mlflow_tracking_uri}", + experiment=dict(name="fake_package", create=True), + run=dict(id="123456789", name="my_run", nested=True), + ui=dict(port="5151", host="localhost"), + hooks=dict(node=dict(flatten_dict_params=True, recursive=False, sep="-")), + ), + ) + + _write_yaml( + tmp_path / "conf" / "base" / "globals.yml", + dict(mlflow_tracking_uri="testruns"), + ) + + expected = { + "mlflow_tracking_uri": (tmp_path / "testruns").as_uri(), + "experiments": {"name": "fake_package", "create": True}, + "run": {"id": "123456789", "name": "my_run", "nested": True}, + "ui": {"port": "5151", "host": "localhost"}, + "hooks": { + "node": {"flatten_dict_params": True, "recursive": False, "sep": "-"} + }, + } + + context = load_context(tmp_path) + assert get_mlflow_config(context).to_dict() == expected diff --git a/tests/framework/hooks/test_node_hook.py b/tests/framework/hooks/test_node_hook.py index 89182c7a..62266d30 100644 --- a/tests/framework/hooks/test_node_hook.py +++ b/tests/framework/hooks/test_node_hook.py @@ -1,14 +1,23 @@ +from pathlib import Path +from typing import Dict + import mlflow import pytest +import yaml from kedro.io import DataCatalog, MemoryDataSet -from kedro.pipeline import node +from kedro.pipeline import Pipeline, node from mlflow.tracking import MlflowClient -from kedro_mlflow.framework.context.config import KedroMlflowConfig from kedro_mlflow.framework.hooks import MlflowNodeHook from kedro_mlflow.framework.hooks.node_hook import flatten_dict +def _write_yaml(filepath: Path, config: Dict): + filepath.parent.mkdir(parents=True, exist_ok=True) + yaml_str = yaml.dump(config) + filepath.write_text(yaml_str) + + def test_flatten_dict_non_nested(): d = dict(a=1, b=2) assert flatten_dict(d=d, recursive=True, sep=".") == d @@ -38,27 +47,27 @@ def test_flatten_dict_nested_2_levels(): } -@pytest.mark.parametrize( - "flatten_dict_params,expected", - [ - (True, {"param1": "1", "parameters-param1": "1", "parameters-param2": "2"}), - (False, {"param1": "1", "parameters": "{'param1': 1, 'param2': 2}"}), - ], -) -def test_node_hook_logging(tmp_path, mocker, flatten_dict_params, expected): +@pytest.fixture +def dummy_run_params(tmp_path): + dummy_run_params = { + "run_id": "", + "project_path": tmp_path.as_posix(), + "env": "local", + "kedro_version": "0.16.4", + "tags": [], + "from_nodes": [], + "to_nodes": [], + "node_names": [], + "from_inputs": [], + "load_versions": [], + "pipeline_name": "my_cool_pipeline", + "extra_params": [], + } + return dummy_run_params - mocker.patch("kedro_mlflow.utils._is_kedro_project", return_value=True) - config = KedroMlflowConfig( - project_path=tmp_path, - node_hook_opts={"flatten_dict_params": flatten_dict_params, "sep": "-"}, - ) - # the function is imported inside the other file antd this is the file to patch - # see https://stackoverflow.com/questions/30987973/python-mock-patch-doesnt-work-as-expected-for-public-method - mocker.patch( - "kedro_mlflow.framework.hooks.node_hook.get_mlflow_config", return_value=config - ) - mlflow_node_hook = MlflowNodeHook() +@pytest.fixture +def dummy_node(): def fake_fun(arg1, arg2, arg3): return None @@ -67,6 +76,21 @@ def fake_fun(arg1, arg2, arg3): inputs={"arg1": "params:param1", "arg2": "foo", "arg3": "parameters"}, outputs="out", ) + + return node_test + + +@pytest.fixture +def dummy_pipeline(dummy_node): + + dummy_pipeline = Pipeline([dummy_node]) + + return dummy_pipeline + + +@pytest.fixture +def dummy_catalog(): + catalog = DataCatalog( { "params:param1": 1, @@ -75,14 +99,94 @@ def fake_fun(arg1, arg2, arg3): "parameters": {"param1": 1, "param2": 2}, } ) - node_inputs = {v: catalog._data_sets.get(v) for k, v in node_test._inputs.items()} + + return catalog + + +def test_pipeline_run_hook_getting_configs( + tmp_path, config_dir, monkeypatch, dummy_run_params, dummy_pipeline, dummy_catalog +): + + monkeypatch.chdir(tmp_path) + + _write_yaml( + tmp_path / "conf" / "base" / "mlflow.yml", + dict( + hooks=dict(node=dict(flatten_dict_params=True, recursive=False, sep="-")), + ), + ), + + mlflow_node_hook = MlflowNodeHook() + mlflow_node_hook.before_pipeline_run( + run_params=dummy_run_params, pipeline=dummy_pipeline, catalog=dummy_catalog + ) + + assert ( + mlflow_node_hook.flatten, + mlflow_node_hook.recursive, + mlflow_node_hook.sep, + ) == (True, False, "-") + + +@pytest.mark.parametrize( + "flatten_dict_params,expected", + [ + (True, {"param1": "1", "parameters-param1": "1", "parameters-param2": "2"}), + (False, {"param1": "1", "parameters": "{'param1': 1, 'param2': 2}"}), + ], +) +def test_node_hook_logging( + tmp_path, + mocker, + monkeypatch, + dummy_run_params, + dummy_catalog, + dummy_pipeline, + dummy_node, + config_dir, + flatten_dict_params, + expected, +): + + mocker.patch("logging.config.dictConfig") + mocker.patch("kedro_mlflow.utils._is_kedro_project", return_value=True) + monkeypatch.chdir(tmp_path) + # config = KedroMlflowConfig( + # project_path=tmp_path, + # node_hook_opts={"flatten_dict_params": flatten_dict_params, "sep": "-"}, + # ) + # # the function is imported inside the other file antd this is the file to patch + # # see https://stackoverflow.com/questions/30987973/python-mock-patch-doesnt-work-as-expected-for-public-method + # mocker.patch( + # "kedro_mlflow.framework.hooks.node_hook.get_mlflow_config", return_value=config + # ) + + _write_yaml( + tmp_path / "conf" / "base" / "mlflow.yml", + dict( + hooks=dict( + node=dict( + flatten_dict_params=flatten_dict_params, recursive=False, sep="-" + ) + ), + ), + ), + + mlflow_node_hook = MlflowNodeHook() + + node_inputs = { + v: dummy_catalog._data_sets.get(v) for k, v in dummy_node._inputs.items() + } mlflow_tracking_uri = (tmp_path / "mlruns").as_uri() mlflow.set_tracking_uri(mlflow_tracking_uri) with mlflow.start_run(): + mlflow_node_hook.before_pipeline_run( + run_params=dummy_run_params, pipeline=dummy_pipeline, catalog=dummy_catalog + ) mlflow_node_hook.before_node_run( - node=node_test, - catalog=catalog, + node=dummy_node, + catalog=dummy_catalog, inputs=node_inputs, is_async=False, run_id="132", diff --git a/tests/framework/hooks/test_pipeline_hook.py b/tests/framework/hooks/test_pipeline_hook.py index e19bfc13..451ec4ef 100644 --- a/tests/framework/hooks/test_pipeline_hook.py +++ b/tests/framework/hooks/test_pipeline_hook.py @@ -4,7 +4,7 @@ import pytest import yaml from kedro.extras.datasets.pickle import PickleDataSet -from kedro.framework.context import KedroContext +from kedro.framework.context import KedroContext, load_context from kedro.io import DataCatalog, MemoryDataSet from kedro.pipeline import Pipeline, node from kedro.runner import SequentialRunner @@ -264,7 +264,8 @@ def test_mlflow_pipeline_hook_with_different_pipeline_types( run_params=dummy_run_params, pipeline=pipeline_to_run, catalog=dummy_catalog ) # test : parameters should have been logged - mlflow_conf = get_mlflow_config(tmp_path) + context = load_context(tmp_path) + mlflow_conf = get_mlflow_config(context) mlflow_client = MlflowClient(mlflow_conf.mlflow_tracking_uri) run_data = mlflow_client.get_run(run_id).data # all run_params are recorded as tags