Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert protobuf model to native format #550

Merged
merged 11 commits into from
Nov 18, 2022
2 changes: 1 addition & 1 deletion .github/workflows/taskrunner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ jobs:
pip install .
- name: Test TaskRunner API
run: |
bash tests/github/test_hello_federation.sh keras_cnn_mnist aggregator col1 col2 $(hostname --all-fqdns | awk '{print $1}') --rounds-to-train 3
bash tests/github/test_hello_federation.sh keras_cnn_mnist aggregator col1 col2 $(hostname --all-fqdns | awk '{print $1}') --rounds-to-train 3 --save-model output_model
25 changes: 23 additions & 2 deletions docs/running_the_federation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -782,10 +782,10 @@ However, continue with the following procedure for details in creating a federat
.. _creating_workspaces:


STEP 1: Create a Workspace on the Aggregator
STEP 1: Create a Workspace
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

1. Start a Python 3.8 (>=3.6, <3.9) virtual environment and confirm |productName| is available.
1. Start a Python 3.8 (>=3.6, <3.11) virtual environment and confirm |productName| is available.

.. code-block:: python

Expand Down Expand Up @@ -1112,6 +1112,27 @@ STEP 3: Start the Federation
When the last round of training is completed, the Aggregator stores the final weights in the protobuf file that was specified in the YAML file, which in this example is located at **save/${WORKSPACE_TEMPLATE}_latest.pbuf**.


Post Experiment
^^^^^^^^^^^^^^^

Experiment owners may access the final model in its native format.
Among other training artifacts, the aggregator creates the last and best aggregated (highest validation score) model snapshots. One may convert a snapshot to the native format and save the model to disk by calling the following command from the workspace:

.. code-block:: console

fx model save -i model_protobuf_path.pth -o save_model_path

In order for this command to succeed, the **TaskRunner** used in the experiment must implement a :code:`save_native()` method.

Another way to access the trained model is by calling the API command directly from a Python script:

.. code-block:: python

from openfl import get_model
model = get_model(plan_config, cols_config, data_config, model_protobuf_path)

In fact, the :code:`get_model()` method returns a **TaskRunner** object loaded with the chosen model snapshot. Users may utilize the linked model as a regular Python object.


.. _running_the_federation_docker:

Expand Down
3 changes: 2 additions & 1 deletion openfl/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright (C) 2020-2021 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""openfl base package."""
# flake8: noqa
from .__version__ import __version__
# flake8: noqa
from .interface.model import get_model
109 changes: 109 additions & 0 deletions openfl/interface/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Copyright (C) 2020-2022 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""Model CLI module."""

from click import confirm
from click import group
from click import option
from click import pass_context
from click import style
from click import Path as ClickPath
from logging import getLogger
from pathlib import Path

from openfl.federated import Plan
from openfl.federated import TaskRunner
from openfl.protocols import utils
from openfl.pipelines import NoCompressionPipeline
from openfl.utilities.workspace import set_directory

logger = getLogger(__name__)


@group()
@pass_context
def model(context):
"""Manage Federated Learning Models."""
context.obj['group'] = 'model'


@model.command(name='save')
@pass_context
@option('-i', '--input', 'model_protobuf_path', required=True,
help='The model protobuf to convert',
type=ClickPath(exists=True))
@option('-o', '--output', 'output_filepath', required=False,
help='Filename the model will be saved to in native format',
default='output_model', type=ClickPath(writable=True))
@option('-p', '--plan-config', required=False,
help='Federated learning plan [plan/plan.yaml]',
default='plan/plan.yaml', type=ClickPath(exists=True))
@option('-c', '--cols-config', required=False,
help='Authorized collaborator list [plan/cols.yaml]',
default='plan/cols.yaml', type=ClickPath(exists=True))
@option('-d', '--data-config', required=False,
help='The data set/shard configuration file [plan/data.yaml]',
default='plan/data.yaml', type=ClickPath(exists=True))
def save_(context, plan_config, cols_config, data_config, model_protobuf_path, output_filepath):
"""
Save the model in native format (PyTorch / Keras).
"""
output_filepath = Path(output_filepath).absolute()
if output_filepath.exists():
if not confirm(style(
'Do you want to overwrite the {}?'.format(output_filepath), fg='red', bold=True
)):
logger.info('Exiting')
context.obj['fail'] = True
return

task_runner = get_model(plan_config, cols_config, data_config, model_protobuf_path)

task_runner.save_native(output_filepath)
logger.info(f'Saved model in native format: 🠆 {output_filepath}')


def get_model(
plan_config: str,
cols_config: str,
data_config: str,
model_protobuf_path: str
) -> TaskRunner:
"""
Initialize TaskRunner and load it with provided model.pbuf.

Contrary to its name, this function returns a TaskRunner instance.
The reason for this behavior is the flexibility of the TaskRunner interface and
the diversity of the ways we store models in our template workspaces.
"""

# Here we change cwd to the experiment workspace folder
# because plan.yaml usually contains relative paths to components.
workspace_path = Path(plan_config).resolve().parent.parent
plan_config = Path(plan_config).resolve().relative_to(workspace_path)
cols_config = Path(cols_config).resolve().relative_to(workspace_path)
data_config = Path(data_config).resolve().relative_to(workspace_path)

with set_directory(workspace_path):
plan = Plan.parse(
plan_config_path=plan_config,
cols_config_path=cols_config,
data_config_path=data_config
)
collaborator_name = list(plan.cols_data_paths)[0]
data_loader = plan.get_data_loader(collaborator_name)
task_runner = plan.get_task_runner(data_loader=data_loader)

model_protobuf_path = Path(model_protobuf_path).resolve()
logger.info(f'Loading OpenFL model protobuf: 🠆 {model_protobuf_path}')

model_protobuf = utils.load_proto(model_protobuf_path)

tensor_dict, _ = utils.deconstruct_model_proto(model_protobuf, NoCompressionPipeline())

# This may break for multiple models.
# task_runner.set_tensor_dict will need to handle multiple models
task_runner.set_tensor_dict(tensor_dict, with_opt_vars=False)

del task_runner.data_loader
return task_runner
21 changes: 21 additions & 0 deletions openfl/utilities/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import shutil
import sys
import time
from contextlib import contextmanager
from pathlib import Path
from subprocess import check_call
from sys import executable
Expand Down Expand Up @@ -126,3 +127,23 @@ def _is_package_versioned(package: str) -> bool:
and package not in ['pkg-resources==0.0.0', 'pkg_resources==0.0.0']
and '-e ' not in package
)


@contextmanager
def set_directory(path: Path):
"""
Sets provided path as the cwd within the context.

Args:
path (Path): The path to the cwd

Yields:
None
"""

origin = Path().absolute()
try:
os.chdir(path)
yield
finally:
os.chdir(origin)
12 changes: 11 additions & 1 deletion tests/github/test_hello_federation.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@ help() {
echo "--rounds-to-train rounds to train"
echo "--col1-data-path data path for collaborator 1"
echo "--col2-data-path data path for collaborator 2"
echo "--save-model path to save model in native format"
echo "-h, --help display this help and exit"
}

# Getting additional options
ADD_OPTS=$(getopt -o "h" -l "rounds-to-train:,col1-data-path:,
col2-data-path:,help" -n test_hello_federation.sh -- "$@")
col2-data-path:,save-model:,help" -n test_hello_federation.sh -- "$@")
eval set -- "$ADD_OPTS"
while (($#)); do
case "${1:-}" in
(--rounds-to-train) ROUNDS_TO_TRAIN="$2" ; shift 2 ;;
(--col1-data-path) COL1_DATA_PATH="$2" ; shift 2 ;;
(--col2-data-path) COL2_DATA_PATH="$2" ; shift 2 ;;
(--save-model) SAVE_MODEL="$2" ; shift 2 ;;
(-h|--help) help ; exit 0 ;;

(--) shift ; break ;;
Expand Down Expand Up @@ -117,4 +119,12 @@ fx collaborator start -n ${COL1} &
cd ${COL2_DIRECTORY}/${FED_WORKSPACE}
fx collaborator start -n ${COL2}
wait

# # Convert model to native format
if [[ ! -z "$SAVE_MODEL" ]]
then
cd ${FED_DIRECTORY}
fx model save -i "./save/${TEMPLATE}_last.pbuf" -o ${SAVE_MODEL}
fi

rm -rf ${FED_DIRECTORY}
38 changes: 38 additions & 0 deletions tests/openfl/interface/test_model_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright (C) 2020-2021 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""Model interface tests module."""

from unittest import mock

from openfl import get_model
from openfl.federated.task import TaskRunner


@mock.patch('openfl.interface.model.utils')
@mock.patch('openfl.interface.model.Plan')
def test_get_model(Plan, utils):
"Test get_module returns TaskRunner."
plan_instance = mock.Mock()
plan_instance.cols_data_paths = ['mock_col_name']
Plan.parse.return_value = plan_instance

plan_instance.get_task_runner.return_value = TaskRunner(data_loader=mock.Mock())
TaskRunner.set_tensor_dict = mock.Mock()

tensor_dict = mock.Mock()
utils.deconstruct_model_proto.return_value = tensor_dict, {}

# Function call
result = get_model('plan_path', 'cols_path', 'data_path', 'model_protobuf_path')

# Asserts below
Plan.parse.assert_called_once()

utils.load_proto.assert_called_once()
utils.deconstruct_model_proto.assert_called_once()

plan_instance.get_task_runner.assert_called_once()

TaskRunner.set_tensor_dict.assert_called_once_with(tensor_dict, with_opt_vars=False)

assert isinstance(result, TaskRunner)