Skip to content

Commit

Permalink
FIX #133 - Enable to specify runner and copy_mode for KedroPipelineMo…
Browse files Browse the repository at this point in the history
…del.predict to decrease inference time
  • Loading branch information
Galileo-Galilei committed Jan 2, 2021
1 parent 232efe0 commit 82e8231
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 32 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@

- Pin the kedro version to force it to be **strictly** inferior to `0.17` which is not compatible with current `kedro-mlflow` version ([#143](https://github.com/Galileo-Galilei/kedro-mlflow/issues/143))

### Changed

- The `KedroPipelineModel.load_context()` method now loads all the `DataSets` in memory in the `DataCatalog`. It is also now possible to specify the `runner` to execute the model as well as the `copy_mode` when executing the inference pipeline (instead of deepcopying the datasets between each nodes which is kedro's default). This makes the API serving with `mlflow serve` command considerably faster (~20 times faster) for models which needs compiling (i.e. keras, tensorflow ...) ([#133](https://github.com/Galileo-Galilei/kedro-mlflow/issues/133))

## [0.4.1] - 2020-12-03

### Added
Expand Down
12 changes: 12 additions & 0 deletions docs/source/07_python_objects/03_Pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,15 @@ mlflow.pyfunc.log_model(
model_signature=model_signature
)
```

It is also possible to pass arguments to `KedroPipelineModel` to specify the runner or the copy_mode of MemoryDataSet for the inference Pipeline. This may be faster especially for compiled model (e.g keras, tensorflow), and more suitable for an API serving pattern.

```python
KedroPipelineModel(
pipeline_ml=pipeline_training,
catalog=catalog,
copy_mode="assign"
)
```

Available `copy_mode` are "assign", "copy" and "deepcopy". It is possible to pass a dictionary to specify different copy mode fo each dataset.
2 changes: 1 addition & 1 deletion kedro_mlflow/framework/hooks/pipeline_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def after_pipeline_run(
mlflow.pyfunc.log_model(
artifact_path=pipeline.model_name,
python_model=KedroPipelineModel(
pipeline_ml=pipeline, catalog=pipeline_catalog
pipeline_ml=pipeline, catalog=pipeline_catalog, **pipeline.kwargs
),
artifacts=artifacts,
conda_env=_format_conda_env(pipeline.conda_env),
Expand Down
102 changes: 88 additions & 14 deletions kedro_mlflow/mlflow/kedro_pipeline_model.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,92 @@
from copy import deepcopy
from pathlib import Path
from typing import Dict, Optional, Union

from kedro.io import DataCatalog, MemoryDataSet
from kedro.runner import SequentialRunner
from kedro.runner import AbstractRunner, SequentialRunner
from mlflow.pyfunc import PythonModel

from kedro_mlflow.pipeline.pipeline_ml import PipelineML


class KedroPipelineModel(PythonModel):
def __init__(self, pipeline_ml: PipelineML, catalog: DataCatalog):
def __init__(
self,
pipeline_ml: PipelineML,
catalog: DataCatalog,
runner: Optional[AbstractRunner] = None,
copy_mode: Optional[Union[Dict[str, str], str]] = None,
):
"""[summary]
Args:
pipeline_ml (PipelineML): A PipelineML object to
store as a Mlflow Model
catalog (DataCatalog): The DataCatalog associated
to the PipelineMl
runner (Optional[AbstractRunner], optional): The kedro
AbstractRunner to use. Defaults to SequentialRunner if
None.
copy_mode (Optional[Union[Dict[str,str], str]]):
The copy_mode of each DataSet of the catalog
when reconstructing the DataCatalog in memory.
You can pass either:
- None to use Kedro default mode for each dataset
- a single string ("deepcopy", "copy" and "assign")
to apply to all datasets
- a dictionnary with (dataset name, copy_mode) key/values
pairs. The associated mode must be a valid kedro mode
("deepcopy", "copy" and "assign") for each. Defaults to None.
"""

self.pipeline_ml = pipeline_ml
self.initial_catalog = pipeline_ml._extract_pipeline_catalog(catalog)
self.loaded_catalog = DataCatalog()
# we have the guarantee that there is only one output in inference
self.output_name = list(pipeline_ml.inference.outputs())[0]
self.runner = runner or SequentialRunner()
self.copy_mode = copy_mode or {}

# copy mode has been converted because it is a property
# TODO: we need to use the runner's default dataset in case of multithreading
self.loaded_catalog = DataCatalog(
data_sets={
name: MemoryDataSet(copy_mode=copy_mode)
for name, copy_mode in self.copy_mode.items()
}
)

@property
def copy_mode(self):
return self._copy_mode

@copy_mode.setter
def copy_mode(self, copy_mode):

if isinstance(copy_mode, str) or copy_mode is None:
# if it is a string, we must create manually the dictionary
# of all catalog entries with this copy_mode
self._copy_mode = {
name: copy_mode
for name in self.pipeline_ml.inference.data_sets()
if name != self.output_name
}
elif isinstance(copy_mode, dict):
# if it is a dict we will retrieve the copy mode when necessary
# it does not matter if this dict does not contain all the catalog entries
# the others will be returned as None when accessing with dict.get()
self._copy_mode = {
name: None
for name in self.pipeline_ml.inference.data_sets()
if name != self.output_name
}
self._copy_mode.update(copy_mode)
else:
raise TypeError(
f"'copy_mode' must be a 'str' or a 'dict', not '{type(copy_mode)}'"
)

def load_context(self, context):

Expand All @@ -42,23 +113,26 @@ def load_context(self, context):
)
)

self.loaded_catalog = deepcopy(self.initial_catalog)
updated_catalog = deepcopy(self.initial_catalog)
for name, uri in context.artifacts.items():
self.loaded_catalog._data_sets[name]._filepath = Path(uri)
updated_catalog._data_sets[name]._filepath = Path(uri)
self.loaded_catalog.save(name=name, data=updated_catalog.load(name))

def predict(self, context, model_input):
# TODO : checkout out how to pass extra args in predict
# for instance, to enable parallelization

self.loaded_catalog.add(
data_set_name=self.pipeline_ml.input_name,
data_set=MemoryDataSet(model_input),
replace=True,
self.loaded_catalog.save(
name=self.pipeline_ml.input_name,
data=model_input,
)
runner = SequentialRunner()
run_outputs = runner.run(

run_output = self.runner.run(
pipeline=self.pipeline_ml.inference, catalog=self.loaded_catalog
)
return run_outputs[
self.output_name
] # unpack the result to avoid messing the json output

# unpack the result to avoid messing the json
# file with the name of the Kedro dataset
unpacked_output = run_output[self.output_name]

return unpacked_output
10 changes: 9 additions & 1 deletion kedro_mlflow/pipeline/pipeline_ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(
conda_env: Optional[Union[str, Path, Dict[str, Any]]] = None,
model_name: Optional[str] = "model",
model_signature: Union[ModelSignature, str, None] = "auto",
**kwargs,
):

"""Store all necessary information for calling mlflow.log_model in the pipeline.
Expand Down Expand Up @@ -86,6 +87,13 @@ def __init__(
- If None, no signature is used
- if a `ModelSignature` instance, passed
to the underlying dataframe
kwargs:
extra arguments to be passed to `KedroPipelineModel`
when the PipelineML object is automatically saved at the end of a run.
This includes:
- `copy_mode`: the copy_mode to be used for underlying dataset
when loaded in memory
- `runner`: the kedro runner to run the model with
"""

super().__init__(nodes, *args, tags=tags)
Expand All @@ -95,7 +103,7 @@ def __init__(
self.model_name = model_name
self.input_name = input_name
self.model_signature = model_signature

self.kwargs = kwargs # its purpose is to be eventually reused when saving the model within a hook
self._check_consistency()

@property
Expand Down
21 changes: 15 additions & 6 deletions kedro_mlflow/pipeline/pipeline_ml_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def pipeline_ml_factory(
conda_env: Optional[Union[str, Path, Dict[str, Any]]] = None,
model_name: Optional[str] = "model",
model_signature: Union[ModelSignature, str, None] = "auto",
**kwargs
) -> PipelineML:
"""This function is a helper to create `PipelineML`
object directly from two Kedro `Pipelines` (one of
Expand Down Expand Up @@ -49,12 +50,19 @@ def pipeline_ml_factory(
the folder where the model will be stored in
remote mlflow. Defaults to "model".
model_signature (Union[ModelSignature, bool]): The mlflow
signature of the input dataframe common to training
and inference.
- If 'auto', it is infered automatically
- If None, no signature is used
- if a `ModelSignature` instance, passed
to the underlying dataframe
signature of the input dataframe common to training
and inference.
- If 'auto', it is infered automatically
- If None, no signature is used
- if a `ModelSignature` instance, passed
to the underlying dataframe
kwargs:
extra arguments to be passed to `KedroPipelineModel`
when the PipelineML object is automatically saved at the end of a run.
This includes:
- `copy_mode`: the copy_mode to be used for underlying dataset
when loaded in memory
- `runner`: the kedro runner to run the model with
Returns:
PipelineML: A `PipelineML` which is automatically
Expand All @@ -70,5 +78,6 @@ def pipeline_ml_factory(
conda_env=conda_env,
model_name=model_name,
model_signature=model_signature,
**kwargs
)
return pipeline
69 changes: 69 additions & 0 deletions tests/framework/hooks/test_pipeline_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,75 @@ def test_mlflow_pipeline_hook_with_different_pipeline_types(
}


@pytest.mark.parametrize(
"copy_mode,expected",
[
(None, {"raw_data": None, "data": None, "model": None}),
("assign", {"raw_data": "assign", "data": "assign", "model": "assign"}),
("deepcopy", {"raw_data": "deepcopy", "data": "deepcopy", "model": "deepcopy"}),
({"model": "assign"}, {"raw_data": None, "data": None, "model": "assign"}),
],
)
def test_mlflow_pipeline_hook_with_copy_mode(
mocker,
monkeypatch,
tmp_path,
config_dir,
dummy_pipeline_ml,
dummy_catalog,
dummy_run_params,
dummy_mlflow_conf,
copy_mode,
expected,
):
# config_with_base_mlflow_conf is a conftest fixture
mocker.patch("kedro_mlflow.utils._is_kedro_project", return_value=True)
monkeypatch.chdir(tmp_path)
pipeline_hook = MlflowPipelineHook()
runner = SequentialRunner()

pipeline_hook.after_catalog_created(
catalog=dummy_catalog,
# `after_catalog_created` is not using any of arguments bellow,
# so we are setting them to empty values.
conf_catalog={},
conf_creds={},
feed_dict={},
save_version="",
load_versions="",
run_id=dummy_run_params["run_id"],
)

pipeline_to_run = pipeline_ml_factory(
training=dummy_pipeline_ml.training,
inference=dummy_pipeline_ml.inference,
input_name=dummy_pipeline_ml.input_name,
conda_env={},
model_name=dummy_pipeline_ml.model_name,
copy_mode=copy_mode,
)
pipeline_hook.before_pipeline_run(
run_params=dummy_run_params, pipeline=pipeline_to_run, catalog=dummy_catalog
)
runner.run(pipeline_to_run, dummy_catalog)
run_id = mlflow.active_run().info.run_id
pipeline_hook.after_pipeline_run(
run_params=dummy_run_params, pipeline=pipeline_to_run, catalog=dummy_catalog
)

mlflow_tracking_uri = (tmp_path / "mlruns").as_uri()
mlflow.set_tracking_uri(mlflow_tracking_uri)

loaded_model = mlflow.pyfunc.load_model(model_uri=f"runs:/{run_id}/model")

actual_copy_mode = {
name: ds._copy_mode
for name, ds in loaded_model._model_impl.python_model.loaded_catalog._data_sets.items()
}

assert actual_copy_mode == expected


def test_mlflow_pipeline_hook_metrics_with_run_id(
mocker,
monkeypatch,
Expand Down
52 changes: 42 additions & 10 deletions tests/mlflow/test_kedro_pipeline_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ def predict_fun(model, data):
return pipeline_ml_obj


def test_model_packaging(tmp_path, pipeline_ml_obj):

catalog = DataCatalog(
@pytest.fixture
def dummy_catalog(tmp_path):
dummy_catalog = DataCatalog(
{
"raw_data": MemoryDataSet(),
"data": MemoryDataSet(),
Expand All @@ -62,12 +62,29 @@ def test_model_packaging(tmp_path, pipeline_ml_obj):
),
}
)
return dummy_catalog

catalog._data_sets["model"].save(2) # emulate model fitting

artifacts = pipeline_ml_obj.extract_pipeline_artifacts(catalog)
@pytest.mark.parametrize(
"copy_mode,expected",
[
(None, {"raw_data": None, "data": None, "model": None}),
("assign", {"raw_data": "assign", "data": "assign", "model": "assign"}),
("deepcopy", {"raw_data": "deepcopy", "data": "deepcopy", "model": "deepcopy"}),
({"model": "assign"}, {"raw_data": None, "data": None, "model": "assign"}),
],
)
def test_model_packaging_with_copy_mode(
tmp_path, pipeline_ml_obj, dummy_catalog, copy_mode, expected
):

kedro_model = KedroPipelineModel(pipeline_ml=pipeline_ml_obj, catalog=catalog)
dummy_catalog._data_sets["model"].save(2) # emulate model fitting

artifacts = pipeline_ml_obj.extract_pipeline_artifacts(dummy_catalog)

kedro_model = KedroPipelineModel(
pipeline_ml=pipeline_ml_obj, catalog=dummy_catalog, copy_mode=copy_mode
)

mlflow_tracking_uri = (tmp_path / "mlruns").as_uri()
mlflow.set_tracking_uri(mlflow_tracking_uri)
Expand All @@ -80,16 +97,31 @@ def test_model_packaging(tmp_path, pipeline_ml_obj):
)
run_id = mlflow.active_run().info.run_id

loaded_model = mlflow.pyfunc.load_model(
model_uri=(Path(r"runs:/") / run_id / "model").as_posix()
)
loaded_model = mlflow.pyfunc.load_model(model_uri=f"runs:/{run_id}/model")

# first assertion: prediction works
assert loaded_model.predict(1) == 2

# second assertion: copy_mode works
actual_copy_mode = {
name: ds._copy_mode
for name, ds in loaded_model._model_impl.python_model.loaded_catalog._data_sets.items()
}

assert actual_copy_mode == expected


def test_kedro_pipeline_ml_with_wrong_copy_mode_type(pipeline_ml_obj, dummy_catalog):
with pytest.raises(TypeError, match="'copy_mode' must be a 'str' or a 'dict'"):
KedroPipelineModel(
pipeline_ml=pipeline_ml_obj, catalog=dummy_catalog, copy_mode=1346
)


# should very likely add tests to see what happens when the artifacts
# are incorrect
# incomplete
# contains to input_name
# contains no input_name
# some memory datasets inside the catalog are persisted?


Expand Down

0 comments on commit 82e8231

Please sign in to comment.