diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ec2b322..283e3af9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ - Pin the kedro version to force it to be **strictly** inferior to `0.17` which is not compatible with current `kedro-mlflow` version ([#143](https://github.com/Galileo-Galilei/kedro-mlflow/issues/143)) +### Changed + +- The `KedroPipelineModel.load_context()` method now loads all the `DataSets` in memory in the `DataCatalog`. It is also now possible to specify the `runner` to execute the model as well as the `copy_mode` when executing the inference pipeline (instead of deepcopying the datasets between each nodes which is kedro's default). This makes the API serving with `mlflow serve` command considerably faster (~20 times faster) for models which needs compiling (i.e. keras, tensorflow ...) ([#133](https://github.com/Galileo-Galilei/kedro-mlflow/issues/133)) + ## [0.4.1] - 2020-12-03 ### Added diff --git a/docs/source/07_python_objects/03_Pipelines.md b/docs/source/07_python_objects/03_Pipelines.md index de8fb561..44cfc064 100644 --- a/docs/source/07_python_objects/03_Pipelines.md +++ b/docs/source/07_python_objects/03_Pipelines.md @@ -64,3 +64,15 @@ mlflow.pyfunc.log_model( model_signature=model_signature ) ``` + +It is also possible to pass arguments to `KedroPipelineModel` to specify the runner or the copy_mode of MemoryDataSet for the inference Pipeline. This may be faster especially for compiled model (e.g keras, tensorflow), and more suitable for an API serving pattern. + +```python +KedroPipelineModel( + pipeline_ml=pipeline_training, + catalog=catalog, + copy_mode="assign" + ) +``` + +Available `copy_mode` are "assign", "copy" and "deepcopy". It is possible to pass a dictionary to specify different copy mode fo each dataset. diff --git a/kedro_mlflow/framework/hooks/pipeline_hook.py b/kedro_mlflow/framework/hooks/pipeline_hook.py index 43b33a64..92d7bbdf 100644 --- a/kedro_mlflow/framework/hooks/pipeline_hook.py +++ b/kedro_mlflow/framework/hooks/pipeline_hook.py @@ -151,7 +151,7 @@ def after_pipeline_run( mlflow.pyfunc.log_model( artifact_path=pipeline.model_name, python_model=KedroPipelineModel( - pipeline_ml=pipeline, catalog=pipeline_catalog + pipeline_ml=pipeline, catalog=pipeline_catalog, **pipeline.kwargs ), artifacts=artifacts, conda_env=_format_conda_env(pipeline.conda_env), diff --git a/kedro_mlflow/mlflow/kedro_pipeline_model.py b/kedro_mlflow/mlflow/kedro_pipeline_model.py index 234e27d3..f34b3655 100644 --- a/kedro_mlflow/mlflow/kedro_pipeline_model.py +++ b/kedro_mlflow/mlflow/kedro_pipeline_model.py @@ -1,21 +1,92 @@ from copy import deepcopy from pathlib import Path +from typing import Dict, Optional, Union from kedro.io import DataCatalog, MemoryDataSet -from kedro.runner import SequentialRunner +from kedro.runner import AbstractRunner, SequentialRunner from mlflow.pyfunc import PythonModel from kedro_mlflow.pipeline.pipeline_ml import PipelineML class KedroPipelineModel(PythonModel): - def __init__(self, pipeline_ml: PipelineML, catalog: DataCatalog): + def __init__( + self, + pipeline_ml: PipelineML, + catalog: DataCatalog, + runner: Optional[AbstractRunner] = None, + copy_mode: Optional[Union[Dict[str, str], str]] = None, + ): + """[summary] + + Args: + pipeline_ml (PipelineML): A PipelineML object to + store as a Mlflow Model + + catalog (DataCatalog): The DataCatalog associated + to the PipelineMl + + runner (Optional[AbstractRunner], optional): The kedro + AbstractRunner to use. Defaults to SequentialRunner if + None. + + copy_mode (Optional[Union[Dict[str,str], str]]): + The copy_mode of each DataSet of the catalog + when reconstructing the DataCatalog in memory. + You can pass either: + - None to use Kedro default mode for each dataset + - a single string ("deepcopy", "copy" and "assign") + to apply to all datasets + - a dictionnary with (dataset name, copy_mode) key/values + pairs. The associated mode must be a valid kedro mode + ("deepcopy", "copy" and "assign") for each. Defaults to None. + """ self.pipeline_ml = pipeline_ml self.initial_catalog = pipeline_ml._extract_pipeline_catalog(catalog) - self.loaded_catalog = DataCatalog() # we have the guarantee that there is only one output in inference self.output_name = list(pipeline_ml.inference.outputs())[0] + self.runner = runner or SequentialRunner() + self.copy_mode = copy_mode or {} + + # copy mode has been converted because it is a property + # TODO: we need to use the runner's default dataset in case of multithreading + self.loaded_catalog = DataCatalog( + data_sets={ + name: MemoryDataSet(copy_mode=copy_mode) + for name, copy_mode in self.copy_mode.items() + } + ) + + @property + def copy_mode(self): + return self._copy_mode + + @copy_mode.setter + def copy_mode(self, copy_mode): + + if isinstance(copy_mode, str) or copy_mode is None: + # if it is a string, we must create manually the dictionary + # of all catalog entries with this copy_mode + self._copy_mode = { + name: copy_mode + for name in self.pipeline_ml.inference.data_sets() + if name != self.output_name + } + elif isinstance(copy_mode, dict): + # if it is a dict we will retrieve the copy mode when necessary + # it does not matter if this dict does not contain all the catalog entries + # the others will be returned as None when accessing with dict.get() + self._copy_mode = { + name: None + for name in self.pipeline_ml.inference.data_sets() + if name != self.output_name + } + self._copy_mode.update(copy_mode) + else: + raise TypeError( + f"'copy_mode' must be a 'str' or a 'dict', not '{type(copy_mode)}'" + ) def load_context(self, context): @@ -42,23 +113,26 @@ def load_context(self, context): ) ) - self.loaded_catalog = deepcopy(self.initial_catalog) + updated_catalog = deepcopy(self.initial_catalog) for name, uri in context.artifacts.items(): - self.loaded_catalog._data_sets[name]._filepath = Path(uri) + updated_catalog._data_sets[name]._filepath = Path(uri) + self.loaded_catalog.save(name=name, data=updated_catalog.load(name)) def predict(self, context, model_input): # TODO : checkout out how to pass extra args in predict # for instance, to enable parallelization - self.loaded_catalog.add( - data_set_name=self.pipeline_ml.input_name, - data_set=MemoryDataSet(model_input), - replace=True, + self.loaded_catalog.save( + name=self.pipeline_ml.input_name, + data=model_input, ) - runner = SequentialRunner() - run_outputs = runner.run( + + run_output = self.runner.run( pipeline=self.pipeline_ml.inference, catalog=self.loaded_catalog ) - return run_outputs[ - self.output_name - ] # unpack the result to avoid messing the json output + + # unpack the result to avoid messing the json + # file with the name of the Kedro dataset + unpacked_output = run_output[self.output_name] + + return unpacked_output diff --git a/kedro_mlflow/pipeline/pipeline_ml.py b/kedro_mlflow/pipeline/pipeline_ml.py index 9f08ddbb..8dd5e577 100644 --- a/kedro_mlflow/pipeline/pipeline_ml.py +++ b/kedro_mlflow/pipeline/pipeline_ml.py @@ -45,6 +45,7 @@ def __init__( conda_env: Optional[Union[str, Path, Dict[str, Any]]] = None, model_name: Optional[str] = "model", model_signature: Union[ModelSignature, str, None] = "auto", + **kwargs, ): """Store all necessary information for calling mlflow.log_model in the pipeline. @@ -86,6 +87,13 @@ def __init__( - If None, no signature is used - if a `ModelSignature` instance, passed to the underlying dataframe + kwargs: + extra arguments to be passed to `KedroPipelineModel` + when the PipelineML object is automatically saved at the end of a run. + This includes: + - `copy_mode`: the copy_mode to be used for underlying dataset + when loaded in memory + - `runner`: the kedro runner to run the model with """ super().__init__(nodes, *args, tags=tags) @@ -95,7 +103,7 @@ def __init__( self.model_name = model_name self.input_name = input_name self.model_signature = model_signature - + self.kwargs = kwargs # its purpose is to be eventually reused when saving the model within a hook self._check_consistency() @property diff --git a/kedro_mlflow/pipeline/pipeline_ml_factory.py b/kedro_mlflow/pipeline/pipeline_ml_factory.py index 4db7bb4a..aefc1755 100644 --- a/kedro_mlflow/pipeline/pipeline_ml_factory.py +++ b/kedro_mlflow/pipeline/pipeline_ml_factory.py @@ -14,6 +14,7 @@ def pipeline_ml_factory( conda_env: Optional[Union[str, Path, Dict[str, Any]]] = None, model_name: Optional[str] = "model", model_signature: Union[ModelSignature, str, None] = "auto", + **kwargs ) -> PipelineML: """This function is a helper to create `PipelineML` object directly from two Kedro `Pipelines` (one of @@ -49,12 +50,19 @@ def pipeline_ml_factory( the folder where the model will be stored in remote mlflow. Defaults to "model". model_signature (Union[ModelSignature, bool]): The mlflow - signature of the input dataframe common to training - and inference. - - If 'auto', it is infered automatically - - If None, no signature is used - - if a `ModelSignature` instance, passed - to the underlying dataframe + signature of the input dataframe common to training + and inference. + - If 'auto', it is infered automatically + - If None, no signature is used + - if a `ModelSignature` instance, passed + to the underlying dataframe + kwargs: + extra arguments to be passed to `KedroPipelineModel` + when the PipelineML object is automatically saved at the end of a run. + This includes: + - `copy_mode`: the copy_mode to be used for underlying dataset + when loaded in memory + - `runner`: the kedro runner to run the model with Returns: PipelineML: A `PipelineML` which is automatically @@ -70,5 +78,6 @@ def pipeline_ml_factory( conda_env=conda_env, model_name=model_name, model_signature=model_signature, + **kwargs ) return pipeline diff --git a/tests/framework/hooks/test_pipeline_hook.py b/tests/framework/hooks/test_pipeline_hook.py index ed6a47cd..c833b49e 100644 --- a/tests/framework/hooks/test_pipeline_hook.py +++ b/tests/framework/hooks/test_pipeline_hook.py @@ -317,6 +317,75 @@ def test_mlflow_pipeline_hook_with_different_pipeline_types( } +@pytest.mark.parametrize( + "copy_mode,expected", + [ + (None, {"raw_data": None, "data": None, "model": None}), + ("assign", {"raw_data": "assign", "data": "assign", "model": "assign"}), + ("deepcopy", {"raw_data": "deepcopy", "data": "deepcopy", "model": "deepcopy"}), + ({"model": "assign"}, {"raw_data": None, "data": None, "model": "assign"}), + ], +) +def test_mlflow_pipeline_hook_with_copy_mode( + mocker, + monkeypatch, + tmp_path, + config_dir, + dummy_pipeline_ml, + dummy_catalog, + dummy_run_params, + dummy_mlflow_conf, + copy_mode, + expected, +): + # config_with_base_mlflow_conf is a conftest fixture + mocker.patch("kedro_mlflow.utils._is_kedro_project", return_value=True) + monkeypatch.chdir(tmp_path) + pipeline_hook = MlflowPipelineHook() + runner = SequentialRunner() + + pipeline_hook.after_catalog_created( + catalog=dummy_catalog, + # `after_catalog_created` is not using any of arguments bellow, + # so we are setting them to empty values. + conf_catalog={}, + conf_creds={}, + feed_dict={}, + save_version="", + load_versions="", + run_id=dummy_run_params["run_id"], + ) + + pipeline_to_run = pipeline_ml_factory( + training=dummy_pipeline_ml.training, + inference=dummy_pipeline_ml.inference, + input_name=dummy_pipeline_ml.input_name, + conda_env={}, + model_name=dummy_pipeline_ml.model_name, + copy_mode=copy_mode, + ) + pipeline_hook.before_pipeline_run( + run_params=dummy_run_params, pipeline=pipeline_to_run, catalog=dummy_catalog + ) + runner.run(pipeline_to_run, dummy_catalog) + run_id = mlflow.active_run().info.run_id + pipeline_hook.after_pipeline_run( + run_params=dummy_run_params, pipeline=pipeline_to_run, catalog=dummy_catalog + ) + + mlflow_tracking_uri = (tmp_path / "mlruns").as_uri() + mlflow.set_tracking_uri(mlflow_tracking_uri) + + loaded_model = mlflow.pyfunc.load_model(model_uri=f"runs:/{run_id}/model") + + actual_copy_mode = { + name: ds._copy_mode + for name, ds in loaded_model._model_impl.python_model.loaded_catalog._data_sets.items() + } + + assert actual_copy_mode == expected + + def test_mlflow_pipeline_hook_metrics_with_run_id( mocker, monkeypatch, diff --git a/tests/mlflow/test_kedro_pipeline_model.py b/tests/mlflow/test_kedro_pipeline_model.py index bbf0acc3..cfd0c52d 100644 --- a/tests/mlflow/test_kedro_pipeline_model.py +++ b/tests/mlflow/test_kedro_pipeline_model.py @@ -51,9 +51,9 @@ def predict_fun(model, data): return pipeline_ml_obj -def test_model_packaging(tmp_path, pipeline_ml_obj): - - catalog = DataCatalog( +@pytest.fixture +def dummy_catalog(tmp_path): + dummy_catalog = DataCatalog( { "raw_data": MemoryDataSet(), "data": MemoryDataSet(), @@ -62,12 +62,29 @@ def test_model_packaging(tmp_path, pipeline_ml_obj): ), } ) + return dummy_catalog - catalog._data_sets["model"].save(2) # emulate model fitting - artifacts = pipeline_ml_obj.extract_pipeline_artifacts(catalog) +@pytest.mark.parametrize( + "copy_mode,expected", + [ + (None, {"raw_data": None, "data": None, "model": None}), + ("assign", {"raw_data": "assign", "data": "assign", "model": "assign"}), + ("deepcopy", {"raw_data": "deepcopy", "data": "deepcopy", "model": "deepcopy"}), + ({"model": "assign"}, {"raw_data": None, "data": None, "model": "assign"}), + ], +) +def test_model_packaging_with_copy_mode( + tmp_path, pipeline_ml_obj, dummy_catalog, copy_mode, expected +): - kedro_model = KedroPipelineModel(pipeline_ml=pipeline_ml_obj, catalog=catalog) + dummy_catalog._data_sets["model"].save(2) # emulate model fitting + + artifacts = pipeline_ml_obj.extract_pipeline_artifacts(dummy_catalog) + + kedro_model = KedroPipelineModel( + pipeline_ml=pipeline_ml_obj, catalog=dummy_catalog, copy_mode=copy_mode + ) mlflow_tracking_uri = (tmp_path / "mlruns").as_uri() mlflow.set_tracking_uri(mlflow_tracking_uri) @@ -80,16 +97,31 @@ def test_model_packaging(tmp_path, pipeline_ml_obj): ) run_id = mlflow.active_run().info.run_id - loaded_model = mlflow.pyfunc.load_model( - model_uri=(Path(r"runs:/") / run_id / "model").as_posix() - ) + loaded_model = mlflow.pyfunc.load_model(model_uri=f"runs:/{run_id}/model") + + # first assertion: prediction works assert loaded_model.predict(1) == 2 + # second assertion: copy_mode works + actual_copy_mode = { + name: ds._copy_mode + for name, ds in loaded_model._model_impl.python_model.loaded_catalog._data_sets.items() + } + + assert actual_copy_mode == expected + + +def test_kedro_pipeline_ml_with_wrong_copy_mode_type(pipeline_ml_obj, dummy_catalog): + with pytest.raises(TypeError, match="'copy_mode' must be a 'str' or a 'dict'"): + KedroPipelineModel( + pipeline_ml=pipeline_ml_obj, catalog=dummy_catalog, copy_mode=1346 + ) + # should very likely add tests to see what happens when the artifacts # are incorrect # incomplete -# contains to input_name +# contains no input_name # some memory datasets inside the catalog are persisted?