Skip to content

Commit

Permalink
[Runtime] Pipeline Executor Second patch, configuration load and exec…
Browse files Browse the repository at this point in the history
…utor export/import. (#9108)

* [pipeline executor] Add configuration load function and pipeline executor export,import function.

* address review comments.

* polish comments and doc string.

* address review comments.

* address review comments.

* Change mod_idx start from 0, remove mod_idx - 1 logic.

* address review comments.

* polish documents.

* adress review comments

* address review comments.

* address review comments.

* polish the document.

* address review comments.

* address review comments.

* polish comments.

* Triger build.

* address review comments.

* address review comments.

* fix grammar issue.

* polish documents.

* add single global binding check.

* address review comments.

* trigger build.
  • Loading branch information
huajsj authored Oct 14, 2021
1 parent 2dc58be commit b206570
Show file tree
Hide file tree
Showing 7 changed files with 660 additions and 74 deletions.
195 changes: 151 additions & 44 deletions python/tvm/contrib/pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
"""Pipeline executor that executes a series of modules in a pipeline fashion."""
import json
import os
import tvm._ffi
from tvm import relay
from tvm.relay.transform import InferType
Expand Down Expand Up @@ -47,21 +48,21 @@ def build(pipe_configs):
ret: PipelineExecutorFactoryModule
Common interface for pipeline executor factory modules.
"""
mods = {}
libs = {}
mod_n_configs = pipe_configs.get_config()
config_len = len(mod_n_configs)
string_config = [{} for _ in range(config_len)]
for ir_mod, mod_config in mod_n_configs.items():
mconf = mod_config["pipeline"].copy()
mod_idx = mconf["mod_idx"] - 1
mod_idx = mconf["mod_idx"]
dev = mod_config["dev"]
target = mod_config["target"]
build_func = relay.build
# Check whether there is a customized build function.
if "build" in mod_config and mod_config["build"]:
build_func = mod_config["build"]

mod = build_func(
lib = build_func(
ir_mod,
target,
params=mod_config["params"],
Expand All @@ -72,22 +73,69 @@ def build(pipe_configs):
mconf["dev"] = "{},{}".format(dev.device_type, dev.device_id)
# Create a pipeline configuration.
string_config[mod_idx] = mconf
mods[mod] = {"dev": dev}
libs[mod_idx] = {"lib": lib, "dev": dev}

return PipelineExecutorFactoryModule(mods, string_config)
return PipelineExecutorFactoryModule(libs, string_config)


class PipelineModule(object):
"""Wrapper of runtime module, caller can use this module to set parameters and get outputs.
Parameters
----------
module : PipelineExecutorFactoryModule
Common interface for pipeline executor factory modules.
module : Union[PipelineExecutorFactoryModule, Module]
Common interface for pipeline executor factory modules or Module.
"""

def __init__(self, module):
self.module = module.module
if isinstance(module, PipelineExecutorFactoryModule):
self.module = module.module
else:
self.module = module
# Get the packed functions from the pipeline executor.
self._get_num_outputs = self.module["get_num_outputs"]

@property
def num_outputs(self):
"""Get the number of outputs.
Returns
-------
count : int
The number of outputs.
"""
return self._get_num_outputs()

@staticmethod
def load_library(config_file_name):
"""Import files to create a pipeline executor.
Parameters
----------
config_file_name : str
Path and name of the configuration file, the configuration file contains the
disk path of the parameter file, library file, and JSON file.
"""
with open(config_file_name, "r") as file_handle:
config = file_handle.read()
config = json.loads(config)
if "load_config" not in config or "pipeline_config" not in config:
raise RuntimeError(
'"load_config" or "pipeline_config" is missing in %s' % config_file_name
)

# The config file used to load library, prameters, and JSON files.
with open(config["load_config"], "r") as file_handle:
load_config = file_handle.read()

# The config file used to load pipeline compute config.
with open(config["pipeline_config"], "r") as file_handle:
pipeline_config = file_handle.read()

# Load a PipelineExecutor from the disk files.
load_library = tvm._ffi.get_global_func("tvm.pipeline_executor.load", allow_missing=False)
module = load_library(load_config, pipeline_config)

return PipelineModule(module)


class PipelineConfig(object):
Expand Down Expand Up @@ -139,13 +187,14 @@ def get_owner_idx(self):
if isinstance(self.io_owner, PipelineConfig.ModuleWrapper):
return self.io_owner.idx

return 0
return -1

def is_global_interface(self):
"""The global interface is the interface visible to the caller which use a pipeline
executor, the global input interface is responsible for passing parameters to the
internal module interface, and the global output interface is responsible for
outputting the results computed by the pipeline executor to a caller.
def is_pipeline_executor_interface(self):
"""The pipeline interface is used to interact with the caller. There are two types
of interfaces, one is 'input' another is 'output'. The pipeline input interface
is responsible for passing parameters to the internal module interface, and the
pipeline output interface is responsible for outputting the results computed by
the pipeline executor to the caller.
"""
return not isinstance(self.io_owner, PipelineConfig.ModuleWrapper)

Expand Down Expand Up @@ -182,9 +231,9 @@ def check_dag_acyclic(self, start, inputs):

def connect(self, binding):
"""Connect the current interface to the destination interface.
Correct connections are as follows: 1. global input connected to module input,
2. module output connected to global output, 3. module output connected to
module input.
Correct connections are as follows: 1. the pipeline input connected to a module input,
2. the module output connected to a pipeline output, 3. the module output connected to
a module input.
Parameters
----------
Expand All @@ -196,31 +245,31 @@ def connect(self, binding):
if self.io_owner == binding.io_owner:
raise RuntimeError(f"Can not bind itself.")

if not self.is_global_interface() and self.io_type == "input":
if not self.is_pipeline_executor_interface() and self.io_type == "input":
raise RuntimeError(f"Module can only bind from output interface!")

if (
not self.is_global_interface()
and not binding.is_global_interface()
not self.is_pipeline_executor_interface()
and not binding.is_pipeline_executor_interface()
and binding.io_type == "output"
):
raise RuntimeError(f"Can not bind module output with another module output!")

if (
not self.is_global_interface()
and binding.is_global_interface()
not self.is_pipeline_executor_interface()
and binding.is_pipeline_executor_interface()
and binding.io_type == "input"
):
raise RuntimeError(f"Can not bind module output with global input!")
raise RuntimeError(f"Can not bind module output with pipeline input!")

if self.is_global_interface() and self.io_type == "output":
if self.is_pipeline_executor_interface() and self.io_type == "output":
raise RuntimeError(f"Global output can not be used as binding start point.")

if self.is_global_interface() and binding.io_type != "input":
if self.is_pipeline_executor_interface() and binding.io_type != "input":
raise RuntimeError(f"Global input can only bind with module input.")

self.bindings.append(binding)
if not self.is_global_interface():
if not self.is_pipeline_executor_interface():
# Check whether the data types of the source and destination are the same.
if (
isinstance(binding.io_owner, PipelineConfig.ModuleWrapper)
Expand Down Expand Up @@ -431,13 +480,16 @@ def get_config(self):
for dep in binding.bindings:
dep_item = {}
_, dname = dep.get_name()
dep_item["mod_idx"] = dep.get_owner_idx()
dep_item["input_name"] = dname
if dep.is_pipeline_executor_interface():
dep_item["global_output_index"] = int(dname)
else:
dep_item["mod_idx"] = dep.get_owner_idx()
dep_item["input_name"] = dname
dep_conf.append(dep_item)

# The value of ouput_idx start from 0.
output["output_idx"] = int(binding.name)
output["dependent"] = dep_conf
output["dependencies"] = dep_conf
output_conf.append(output)

mconf["mod_idx"] = module.idx
Expand Down Expand Up @@ -472,7 +524,7 @@ def dag_topology_sort(self):
mlist += temp_list

for mod, i in zip(mlist, range(len(mlist))):
self.mod_wrapper[mod].set_idx_name(i + 1)
self.mod_wrapper[mod].set_idx_name(i)

def get_mod_idx(self, mod):
# Return the module index.
Expand Down Expand Up @@ -502,16 +554,13 @@ class PipelineExecutorFactoryModule(object):
"""

def __init__(self, pipeline_mods, mods_config):
mods, config = self.graph_executor_create(pipeline_mods, mods_config)
assert (
pipeline_executor_enabled()
), "Pipeline executor is not enabled. Please \
re-build TVM with USE_PIPELINE_EXECUTOR=ON"
pipeline_create = tvm._ffi.get_global_func(
self.pipeline_mods = pipeline_mods
self.mods_config = mods_config
graph_executors, config = self.graph_executor_create(pipeline_mods, mods_config)
self.pipeline_create = tvm._ffi.get_global_func(
"tvm.pipeline_executor.create", allow_missing=False
)
assert pipeline_create
self.module = pipeline_create(mods, config)
self.module = self.pipeline_create(graph_executors, config)

def graph_executor_create(self, pipeline_mods, mod_config):
"""Create graph_executor list and return configuration as a json string.
Expand All @@ -532,12 +581,70 @@ def graph_executor_create(self, pipeline_mods, mod_config):
mod_config : str
The Modudle configuration.
"""
# Should store modules in the list named 'mods' in index order.
mods = [None for _ in range(len(pipeline_mods))]
for lib_index in pipeline_mods:
pipeline_lib = pipeline_mods[lib_index]["lib"]
dev = pipeline_mods[lib_index]["dev"]
lib = graph_executor.GraphModule(pipeline_lib["default"](dev))
# Return a module list sorted by lib_index.
mods[lib_index] = lib.module

return mods, json.dumps(mod_config)

def export_library(self, directory_path):
"""Export the pipeline executor into disk files.
mods = []
for pipeline_mod in pipeline_mods:
mod = graph_executor.GraphModule(
pipeline_mod["default"](pipeline_mods[pipeline_mod]["dev"])
Parameters
----------
directory_path : str
Export the files to this directory.
"""
if not self.pipeline_mods:
raise RuntimeError(f"The pipeline executor has not been initialized.")

# Check if the directory_path exists.
if not os.path.exists(directory_path):
raise RuntimeError(f"The directory {directory_path} does not exist.")
# Create an load configuration.
load_config_file_name = "{}/load_config".format(directory_path)
pipeline_config_file_name = "{}/pipeline_config".format(directory_path)
config = {}
config["load_config"] = load_config_file_name
config["pipeline_config"] = pipeline_config_file_name
load_config = []
# Export the library, JSON, and parameter into files, then export these files path
# into a configuration file.
for lib_index in self.pipeline_mods:
mconfig = {}
mconfig["mod_idx"] = lib_index
mconfig["lib_name"] = "{}/lib{}.so".format(directory_path, lib_index)
mconfig["json_name"] = "{}/json{}".format(directory_path, lib_index)
mconfig["params_name"] = "{}/params{}".format(directory_path, lib_index)
mconfig["dev"] = "{},{}".format(
self.pipeline_mods[lib_index]["dev"].device_type,
self.pipeline_mods[lib_index]["dev"].device_id,
)
mods.append(mod.module)

return mods, json.dumps(mod_config)
# Get the graph, lib, and parameters from GraphExecutorFactoryModule.
graph, lib, params = self.pipeline_mods[lib_index]["lib"]
# Export the lib, graph, and parameters to disk.
lib.export_library(mconfig["lib_name"])
with open(mconfig["json_name"], "w") as file_handle:
file_handle.write(graph)
with open(mconfig["params_name"], "wb") as file_handle:
file_handle.write(relay.save_param_dict(params))

load_config.append(mconfig)

with open(load_config_file_name, "w") as file_handle:
json.dump(load_config, file_handle)

with open(pipeline_config_file_name, "w") as file_handle:
json.dump(self.mods_config, file_handle)

config_file_name = "{}/config".format(directory_path)
with open(config_file_name, "w") as file_handle:
json.dump(config, file_handle)

return config_file_name
Loading

0 comments on commit b206570

Please sign in to comment.