diff --git a/python/tvm/contrib/pipeline_executor.py b/python/tvm/contrib/pipeline_executor.py index 36c03891d210..37b9fed8eb91 100644 --- a/python/tvm/contrib/pipeline_executor.py +++ b/python/tvm/contrib/pipeline_executor.py @@ -16,6 +16,7 @@ # under the License. """Pipeline executor that executes a series of modules in a pipeline fashion.""" import json +import os import tvm._ffi from tvm import relay from tvm.relay.transform import InferType @@ -47,13 +48,13 @@ def build(pipe_configs): ret: PipelineExecutorFactoryModule Common interface for pipeline executor factory modules. """ - mods = {} + libs = {} mod_n_configs = pipe_configs.get_config() config_len = len(mod_n_configs) string_config = [{} for _ in range(config_len)] for ir_mod, mod_config in mod_n_configs.items(): mconf = mod_config["pipeline"].copy() - mod_idx = mconf["mod_idx"] - 1 + mod_idx = mconf["mod_idx"] dev = mod_config["dev"] target = mod_config["target"] build_func = relay.build @@ -61,7 +62,7 @@ def build(pipe_configs): if "build" in mod_config and mod_config["build"]: build_func = mod_config["build"] - mod = build_func( + lib = build_func( ir_mod, target, params=mod_config["params"], @@ -72,9 +73,9 @@ def build(pipe_configs): mconf["dev"] = "{},{}".format(dev.device_type, dev.device_id) # Create a pipeline configuration. string_config[mod_idx] = mconf - mods[mod] = {"dev": dev} + libs[mod_idx] = {"lib": lib, "dev": dev} - return PipelineExecutorFactoryModule(mods, string_config) + return PipelineExecutorFactoryModule(libs, string_config) class PipelineModule(object): @@ -82,12 +83,59 @@ class PipelineModule(object): Parameters ---------- - module : PipelineExecutorFactoryModule - Common interface for pipeline executor factory modules. + module : Union[PipelineExecutorFactoryModule, Module] + Common interface for pipeline executor factory modules or Module. """ def __init__(self, module): - self.module = module.module + if isinstance(module, PipelineExecutorFactoryModule): + self.module = module.module + else: + self.module = module + # Get the packed functions from the pipeline executor. + self._get_num_outputs = self.module["get_num_outputs"] + + @property + def num_outputs(self): + """Get the number of outputs. + Returns + ------- + count : int + The number of outputs. + """ + return self._get_num_outputs() + + @staticmethod + def load_library(config_file_name): + """Import files to create a pipeline executor. + + Parameters + ---------- + config_file_name : str + Path and name of the configuration file, the configuration file contains the + disk path of the parameter file, library file, and JSON file. + """ + with open(config_file_name, "r") as file_handle: + config = file_handle.read() + config = json.loads(config) + if "load_config" not in config or "pipeline_config" not in config: + raise RuntimeError( + '"load_config" or "pipeline_config" is missing in %s' % config_file_name + ) + + # The config file used to load library, prameters, and JSON files. + with open(config["load_config"], "r") as file_handle: + load_config = file_handle.read() + + # The config file used to load pipeline compute config. + with open(config["pipeline_config"], "r") as file_handle: + pipeline_config = file_handle.read() + + # Load a PipelineExecutor from the disk files. + load_library = tvm._ffi.get_global_func("tvm.pipeline_executor.load", allow_missing=False) + module = load_library(load_config, pipeline_config) + + return PipelineModule(module) class PipelineConfig(object): @@ -139,13 +187,14 @@ def get_owner_idx(self): if isinstance(self.io_owner, PipelineConfig.ModuleWrapper): return self.io_owner.idx - return 0 + return -1 - def is_global_interface(self): - """The global interface is the interface visible to the caller which use a pipeline - executor, the global input interface is responsible for passing parameters to the - internal module interface, and the global output interface is responsible for - outputting the results computed by the pipeline executor to a caller. + def is_pipeline_executor_interface(self): + """The pipeline interface is used to interact with the caller. There are two types + of interfaces, one is 'input' another is 'output'. The pipeline input interface + is responsible for passing parameters to the internal module interface, and the + pipeline output interface is responsible for outputting the results computed by + the pipeline executor to the caller. """ return not isinstance(self.io_owner, PipelineConfig.ModuleWrapper) @@ -182,9 +231,9 @@ def check_dag_acyclic(self, start, inputs): def connect(self, binding): """Connect the current interface to the destination interface. - Correct connections are as follows: 1. global input connected to module input, - 2. module output connected to global output, 3. module output connected to - module input. + Correct connections are as follows: 1. the pipeline input connected to a module input, + 2. the module output connected to a pipeline output, 3. the module output connected to + a module input. Parameters ---------- @@ -196,31 +245,31 @@ def connect(self, binding): if self.io_owner == binding.io_owner: raise RuntimeError(f"Can not bind itself.") - if not self.is_global_interface() and self.io_type == "input": + if not self.is_pipeline_executor_interface() and self.io_type == "input": raise RuntimeError(f"Module can only bind from output interface!") if ( - not self.is_global_interface() - and not binding.is_global_interface() + not self.is_pipeline_executor_interface() + and not binding.is_pipeline_executor_interface() and binding.io_type == "output" ): raise RuntimeError(f"Can not bind module output with another module output!") if ( - not self.is_global_interface() - and binding.is_global_interface() + not self.is_pipeline_executor_interface() + and binding.is_pipeline_executor_interface() and binding.io_type == "input" ): - raise RuntimeError(f"Can not bind module output with global input!") + raise RuntimeError(f"Can not bind module output with pipeline input!") - if self.is_global_interface() and self.io_type == "output": + if self.is_pipeline_executor_interface() and self.io_type == "output": raise RuntimeError(f"Global output can not be used as binding start point.") - if self.is_global_interface() and binding.io_type != "input": + if self.is_pipeline_executor_interface() and binding.io_type != "input": raise RuntimeError(f"Global input can only bind with module input.") self.bindings.append(binding) - if not self.is_global_interface(): + if not self.is_pipeline_executor_interface(): # Check whether the data types of the source and destination are the same. if ( isinstance(binding.io_owner, PipelineConfig.ModuleWrapper) @@ -431,13 +480,16 @@ def get_config(self): for dep in binding.bindings: dep_item = {} _, dname = dep.get_name() - dep_item["mod_idx"] = dep.get_owner_idx() - dep_item["input_name"] = dname + if dep.is_pipeline_executor_interface(): + dep_item["global_output_index"] = int(dname) + else: + dep_item["mod_idx"] = dep.get_owner_idx() + dep_item["input_name"] = dname dep_conf.append(dep_item) # The value of ouput_idx start from 0. output["output_idx"] = int(binding.name) - output["dependent"] = dep_conf + output["dependencies"] = dep_conf output_conf.append(output) mconf["mod_idx"] = module.idx @@ -472,7 +524,7 @@ def dag_topology_sort(self): mlist += temp_list for mod, i in zip(mlist, range(len(mlist))): - self.mod_wrapper[mod].set_idx_name(i + 1) + self.mod_wrapper[mod].set_idx_name(i) def get_mod_idx(self, mod): # Return the module index. @@ -502,16 +554,13 @@ class PipelineExecutorFactoryModule(object): """ def __init__(self, pipeline_mods, mods_config): - mods, config = self.graph_executor_create(pipeline_mods, mods_config) - assert ( - pipeline_executor_enabled() - ), "Pipeline executor is not enabled. Please \ - re-build TVM with USE_PIPELINE_EXECUTOR=ON" - pipeline_create = tvm._ffi.get_global_func( + self.pipeline_mods = pipeline_mods + self.mods_config = mods_config + graph_executors, config = self.graph_executor_create(pipeline_mods, mods_config) + self.pipeline_create = tvm._ffi.get_global_func( "tvm.pipeline_executor.create", allow_missing=False ) - assert pipeline_create - self.module = pipeline_create(mods, config) + self.module = self.pipeline_create(graph_executors, config) def graph_executor_create(self, pipeline_mods, mod_config): """Create graph_executor list and return configuration as a json string. @@ -532,12 +581,70 @@ def graph_executor_create(self, pipeline_mods, mod_config): mod_config : str The Modudle configuration. """ + # Should store modules in the list named 'mods' in index order. + mods = [None for _ in range(len(pipeline_mods))] + for lib_index in pipeline_mods: + pipeline_lib = pipeline_mods[lib_index]["lib"] + dev = pipeline_mods[lib_index]["dev"] + lib = graph_executor.GraphModule(pipeline_lib["default"](dev)) + # Return a module list sorted by lib_index. + mods[lib_index] = lib.module + + return mods, json.dumps(mod_config) + + def export_library(self, directory_path): + """Export the pipeline executor into disk files. - mods = [] - for pipeline_mod in pipeline_mods: - mod = graph_executor.GraphModule( - pipeline_mod["default"](pipeline_mods[pipeline_mod]["dev"]) + Parameters + ---------- + directory_path : str + Export the files to this directory. + """ + if not self.pipeline_mods: + raise RuntimeError(f"The pipeline executor has not been initialized.") + + # Check if the directory_path exists. + if not os.path.exists(directory_path): + raise RuntimeError(f"The directory {directory_path} does not exist.") + # Create an load configuration. + load_config_file_name = "{}/load_config".format(directory_path) + pipeline_config_file_name = "{}/pipeline_config".format(directory_path) + config = {} + config["load_config"] = load_config_file_name + config["pipeline_config"] = pipeline_config_file_name + load_config = [] + # Export the library, JSON, and parameter into files, then export these files path + # into a configuration file. + for lib_index in self.pipeline_mods: + mconfig = {} + mconfig["mod_idx"] = lib_index + mconfig["lib_name"] = "{}/lib{}.so".format(directory_path, lib_index) + mconfig["json_name"] = "{}/json{}".format(directory_path, lib_index) + mconfig["params_name"] = "{}/params{}".format(directory_path, lib_index) + mconfig["dev"] = "{},{}".format( + self.pipeline_mods[lib_index]["dev"].device_type, + self.pipeline_mods[lib_index]["dev"].device_id, ) - mods.append(mod.module) - return mods, json.dumps(mod_config) + # Get the graph, lib, and parameters from GraphExecutorFactoryModule. + graph, lib, params = self.pipeline_mods[lib_index]["lib"] + # Export the lib, graph, and parameters to disk. + lib.export_library(mconfig["lib_name"]) + with open(mconfig["json_name"], "w") as file_handle: + file_handle.write(graph) + with open(mconfig["params_name"], "wb") as file_handle: + file_handle.write(relay.save_param_dict(params)) + + load_config.append(mconfig) + + with open(load_config_file_name, "w") as file_handle: + json.dump(load_config, file_handle) + + with open(pipeline_config_file_name, "w") as file_handle: + json.dump(self.mods_config, file_handle) + + config_file_name = "{}/config".format(directory_path) + with open(config_file_name, "w") as file_handle: + json.dump(config, file_handle) + + return config_file_name diff --git a/src/runtime/pipeline/pipeline_executor.cc b/src/runtime/pipeline/pipeline_executor.cc index 41f867057282..3820ce942af0 100644 --- a/src/runtime/pipeline/pipeline_executor.cc +++ b/src/runtime/pipeline/pipeline_executor.cc @@ -21,31 +21,129 @@ * \file pipeline_executor.cc */ #include "pipeline_executor.h" - namespace tvm { namespace runtime { +/*! + * \brief Give frontends an access to packed functions. + * \param name The name of the function. + * \param sptr_to_self The pointer to the module node. + * \return The corresponding packed function. + */ +PackedFunc PipelineExecutor::GetFunction(const std::string& name, + const ObjectPtr& sptr_to_self) { + if (name == "get_num_outputs") { + return PackedFunc( + [sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { *rv = this->NumOutputs(); }); + } else { + LOG(FATAL) << "Unknown packed function: " << name; + return PackedFunc(); + } + return nullptr; +} -void PipelineRuntime::Init(const Array& modules, - const std::string& pipeline_json) { - return; +/*! + * \brief Use the mod_config information to create a graph runtime list. + * \param mod_config The config information that generates by the export library function call. + */ +std::vector PipelineExecutor::CreateGraphModules(const ModuleConfig& mod_config) { + const PackedFunc* graph_executor_create = Registry::Get("tvm.graph_executor.create"); + std::vector ret; + ret.resize(mod_config.size()); + for (auto config : mod_config) { + // Load library. + auto lib = Module::LoadFromFile(config.second.lib_name.c_str()); + + // Read json. + std::ifstream ifJson(config.second.json_name.c_str()); + if (ifJson.fail()) { + LOG(FATAL) << "json file not found: " << config.second.json_name; + } + const std::string json((std::istreambuf_iterator(ifJson)), + std::istreambuf_iterator()); + + // Create a graph executor. + std::istringstream istr(config.second.dev); + std::string str; + int device_type = 1, device_id = 0; + while (getline(istr, str, ';')) { + std::istringstream istr_dev(str); + std::string str_temp; + if (getline(istr_dev, str_temp)) { + device_type = stoi(str_temp); + } + if (getline(istr_dev, str_temp)) { + device_id = stoi(str_temp); + } + } + Module graph_module = (*graph_executor_create)(json, lib, device_type, device_id); + + // Load parameters. + TVMByteArray params_arr; + const char* params_file_name = config.second.params_name.c_str(); + std::ifstream if_param(params_file_name); + if (if_param.fail()) { + LOG(FATAL) << "params file not found: " << params_file_name; + } + const std::string params((std::istreambuf_iterator(if_param)), + std::istreambuf_iterator()); + params_arr.data = params.c_str(); + params_arr.size = params.length(); + auto load_params = graph_module.GetFunction("load_params"); + load_params(params_arr); + + // Put a graph executor module into the vector. + ret[config.first] = graph_module; + } + return ret; } -/* GetFunction can not be pure abstract function, implement an empty function for now. +/*! + * \brief Initialize the pipeline executor with a list of modules to be pipelined + * and config in JSON format. + * \param modules The module list used for building the pipeline. + * \param pipeline_json The configuration of modules dependencies. */ -PackedFunc PipelineRuntime::GetFunction(const std::string& name, - const ObjectPtr& sptr_to_self) { - return nullptr; +void PipelineExecutor::Init(const std::vector& modules, const std::string& pipeline_json) { + ICHECK(!modules.empty()) << "The graph executor module list is empty."; + // Use JSONReader to load pipeline configuration. + std::istringstream is(pipeline_json); + dmlc::JSONReader reader(&is); + PipelineConfig& pipeline_config = this->LoadPipelineConfig(&reader); + ICHECK(!pipeline_config.Empty()) << "The pipeline config information is empty."; + // Initialize the pipeline function class used for pipeline thread pool management + // and schedule etc. This function returns the number of output. + num_outputs_ = pipeline_scheduler_.PipelineInit(modules, pipeline_config); + return; } -Module PipelineRuntimeCreate(const Array& m, - const std::string& pipeline_json) { - auto exec = make_object(); - exec->Init(m, pipeline_json); +Module PipelineExecutorCreate(const Array& m, const std::string& pipeline_json) { + ICHECK(!m.empty()) << "The module list is empty."; + auto exec = make_object(); + std::vector graph_modules; + for (auto mod : m) { + graph_modules.push_back(mod); + } + exec->Init(graph_modules, pipeline_json); + return Module(exec); +} + +Module PipelineExecutorLoad(const std::string& load_json, const std::string& pipeline_json) { + auto exec = make_object(); + std::istringstream is(load_json); + dmlc::JSONReader reader(&is); + ModuleConfig& mod_config = exec->LoadModuleConfig(&reader); + ICHECK(!mod_config.empty()) << "The module config is empty."; + std::vector modules = exec->CreateGraphModules(mod_config); + exec->Init(modules, pipeline_json); return Module(exec); } TVM_REGISTER_GLOBAL("tvm.pipeline_executor.create").set_body([](TVMArgs args, TVMRetValue* rv) { - *rv = PipelineRuntimeCreate(args[0], args[1]); + *rv = PipelineExecutorCreate(args[0], args[1]); +}); + +TVM_REGISTER_GLOBAL("tvm.pipeline_executor.load").set_body([](TVMArgs args, TVMRetValue* rv) { + *rv = PipelineExecutorLoad(args[0], args[1]); }); } // namespace runtime } // namespace tvm diff --git a/src/runtime/pipeline/pipeline_executor.h b/src/runtime/pipeline/pipeline_executor.h index c7625c62b724..a883ba25ec08 100644 --- a/src/runtime/pipeline/pipeline_executor.h +++ b/src/runtime/pipeline/pipeline_executor.h @@ -23,9 +23,16 @@ */ #ifndef TVM_RUNTIME_PIPELINE_PIPELINE_EXECUTOR_H_ #define TVM_RUNTIME_PIPELINE_PIPELINE_EXECUTOR_H_ + #include +#include +#include +#include #include +#include + +#include "pipeline_scheduler.h" namespace tvm { namespace runtime { /*! @@ -36,18 +43,23 @@ namespace runtime { * * This executor can be accessed by various language via TVM runtime PackedFunc API. */ -class TVM_DLL PipelineRuntime : public ModuleNode { +class TVM_DLL PipelineExecutor : public ModuleNode { public: /*! * \Return the type key of the executor. */ - const char* type_key() const final { return "PipelineRuntime"; } + const char* type_key() const final { return "PipelineExecutor"; } /*! - * \brief Initialize the pipeline executor with module array and json text. + * \brief Initialize the pipeline executor with module array and JSON text. * \param modules The module list used for building pipeline. * \param pipeline_json The configuration of modules dependencies. */ - void Init(const Array& modules, const std::string& pipeline_json); + void Init(const std::vector& modules, const std::string& pipeline_json); + /*! + * \brief Use the information of mod_config to create a list of graph executor. + * \param mod_config The configuration information generated by the library export function call. + */ + std::vector CreateGraphModules(const ModuleConfig& mod_config); /*! * \brief Give frontends an access to packed functions. * \param name The name of the function. @@ -55,6 +67,86 @@ class TVM_DLL PipelineRuntime : public ModuleNode { * \return The corresponding packed function. */ virtual PackedFunc GetFunction(const std::string& name, const ObjectPtr& sptr_to_self); + + /*! + * \brief Get the number of outputs. + * + * \return The number of outputs. + */ + int NumOutputs() const { return num_outputs_; } + + /*!\brief Load the module files information.*/ + ModuleConfig& LoadModuleConfig(dmlc::JSONReader* reader) { + reader->BeginArray(); + while (reader->NextArrayItem()) { + std::string key; + reader->BeginObject(); + int mod_idx = -1; + std::string lib_name; + std::string json_name; + std::string params_name; + std::string dev; + while (reader->NextObjectItem(&key)) { + if (key == "mod_idx") { + reader->Read(&mod_idx); + } else if (key == "lib_name") { + reader->Read(&lib_name); + } else if (key == "json_name") { + reader->Read(&json_name); + } else if (key == "params_name") { + reader->Read(¶ms_name); + } else if (key == "dev") { + reader->Read(&dev); + } else { + LOG(FATAL) << "do not support key " << key; + } + } + ICHECK(mod_idx >= 0) << "Invalid mod_idx value " << mod_idx; + // Load the lib, json, and params information. + ICHECK(!lib_name.empty()) << "lib_name is empty."; + ICHECK(!json_name.empty()) << "json_name is empty."; + ICHECK(!params_name.empty()) << "params_name is empty."; + mod_config_[mod_idx] = GraphModuleLoadInfo(lib_name, json_name, params_name, dev); + } + return mod_config_; + } + + private: + /*!\brief The class used to execute and schedule the pipeline logic.*/ + PipelineScheduler pipeline_scheduler_; + /*!\brief The dependency information of each graph runtime module of the pipeline.*/ + PipelineConfig pipeline_config_; + /*!\brief The module information used to create the graph runtimes.*/ + ModuleConfig mod_config_; + /*!\brief How many outputs are in this pipeline executor.*/ + size_t num_outputs_ = 0; + /*!\brief Json loader.*/ + PipelineConfig& LoadPipelineConfig(dmlc::JSONReader* reader) { + reader->BeginArray(); + while (reader->NextArrayItem()) { + std::string key; + reader->BeginObject(); + int mod_idx = -1; + OutputMap output; + std::string dev; + while (reader->NextObjectItem(&key)) { + if (key == "mod_idx") { + reader->Read(&mod_idx); + } else if (key == "dev") { + reader->Read(&dev); + } else if (key == "output") { + reader->Read(&output); + } else { + LOG(FATAL) << "do not support key " << key; + } + } + ICHECK(mod_idx >= 0) << "Invalid mod_idx value " << mod_idx; + // Check if the output is successfully read. + ICHECK(!output.Empty()) << "Invalid output binding result."; + pipeline_config_.Insert(mod_idx, output); + } + return pipeline_config_; + } }; } // namespace runtime } // namespace tvm diff --git a/src/runtime/pipeline/pipeline_scheduler.cc b/src/runtime/pipeline/pipeline_scheduler.cc new file mode 100644 index 000000000000..82caf855a479 --- /dev/null +++ b/src/runtime/pipeline/pipeline_scheduler.cc @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "pipeline_scheduler.h" + +#include +#include +namespace tvm { +namespace runtime { +/*! + * \brief Initialize the pipeline. + * \param modules The list of graph executor modules. + * \param pipeline_conf The dependency information of each graph executor module. + */ +size_t PipelineScheduler::PipelineInit(const std::vector& modules, + const PipelineConfig& pipeline_config) { + graph_modules_ = modules; + int num_output = pipeline_config.GetGlobalOutputNum(); + return num_output; +} +} // namespace runtime +} // namespace tvm diff --git a/src/runtime/pipeline/pipeline_scheduler.h b/src/runtime/pipeline/pipeline_scheduler.h new file mode 100644 index 000000000000..5ee127edffa3 --- /dev/null +++ b/src/runtime/pipeline/pipeline_scheduler.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef TVM_RUNTIME_PIPELINE_PIPELINE_SCHEDULER_H_ +#define TVM_RUNTIME_PIPELINE_PIPELINE_SCHEDULER_H_ +#include +#include +#include + +#include +#include +#include +#include + +#include "pipeline_struct.h" +namespace tvm { +namespace runtime { +/*! + * \brief The class that executes the pipeline logic,it is used to initialize the thread pool, + execute and schedule pipeline tasks, allocate and manage memory, etc. + */ +class PipelineScheduler { + public: + /*! + * \brief Initialize the pipeline. + * \param modules The list of graph executor module. + * \param pipeline_config The dependency information of each graph executor module. + */ + size_t PipelineInit(const std::vector& modules, const PipelineConfig& pipeline_config); + + private: + /*!\brief The list of graph executors.*/ + std::vector graph_modules_; +}; +} // namespace runtime +} // namespace tvm +#endif // TVM_RUNTIME_PIPELINE_PIPELINE_SCHEDULER_H_ diff --git a/src/runtime/pipeline/pipeline_struct.h b/src/runtime/pipeline/pipeline_struct.h new file mode 100644 index 000000000000..3cc9621702c1 --- /dev/null +++ b/src/runtime/pipeline/pipeline_struct.h @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef TVM_RUNTIME_PIPELINE_PIPELINE_STRUCT_H_ +#define TVM_RUNTIME_PIPELINE_PIPELINE_STRUCT_H_ +#include +#include +#include + +#include +#include +#include +#include +/*! + * \brief All binding information of a output interface. + */ +struct OutputBindings { + /*!\brief Output interface binding information, 'int' is the index of the module that + * uses this output data as the input interface data, 'string' is the input interface name + * of the module. + */ + std::unordered_map bindings; + /*! The index value of the global interface to which the current output are bound.*/ + int global_output_index = std::numeric_limits::min(); + /*!\brief Whether this binding is bound to the PipelineExecutor output interface.*/ + bool IsGlobalOutput() const { return global_output_index >= 0; } + /*! + * \brief Create a module interface map from JSONReader. + * \param reader JSON reader. + */ + void Load(dmlc::JSONReader* reader) { + reader->BeginArray(); + while (reader->NextArrayItem()) { + std::string key; + reader->BeginObject(); + std::string input_name; + int mod_idx = std::numeric_limits::min(); + // Whether the output binding is global. + bool global_binding = false; + while (reader->NextObjectItem(&key)) { + if (key == "mod_idx") { + reader->Read(&mod_idx); + } else if (key == "input_name") { + reader->Read(&input_name); + } else if (key == "global_output_index") { + // There should be only one global binding. + ICHECK(global_output_index < 0); + reader->Read(&global_output_index); + // When the key value is 'global_output_index', it means that this output is bound to + // a global interface. + global_binding = true; + } else { + LOG(FATAL) << "do not support key " << key; + } + } + // When this output is bound to a global interface, check if the global interface index + // start from 0. + if (global_binding) { + ICHECK(global_output_index >= 0); + } else { + // When this output is bound to a graph executor module interface, check if the module + // index start from 0. + ICHECK(mod_idx >= 0); + bindings[mod_idx] = input_name; + } + } + } +}; + +/*! + * \brief The binding information of all outputs of a module. + */ +struct OutputMap { + /*! \brief Output binding map, 'int' is output interface index.*/ + std::unordered_map output_binding_map; + OutputMap& operator=(const OutputMap& output) { + output_binding_map = output.output_binding_map; + return *this; + } + + /*!\brief This function is used to verify whether OutputMap is successfully loaded. + * \return Return true to indicate that this class has not been successfully loaded. + */ + bool Empty() { return output_binding_map.empty(); } + /*! \brief The pipeline outputs is the final outputs of pipeline, this function is used to + * get how many pipeline outputs are in this Outputmap + * \return Number of pipeline outputs. + */ + size_t GetGlobalOutputNum(void) const { + size_t num_output = 0; + for (auto bindings : output_binding_map) { + num_output += bindings.second.IsGlobalOutput() ? 1 : 0; + } + return num_output; + } + + /*! + * \brief Create a output binding map from JSONReader. + * \param reader Json reader. + */ + void Load(dmlc::JSONReader* reader) { + reader->BeginArray(); + while (reader->NextArrayItem()) { + std::string key; + reader->BeginObject(); + int output_idx = -1; + OutputBindings binding; + while (reader->NextObjectItem(&key)) { + if (key == "output_idx") { + reader->Read(&output_idx); + } else if (key == "dependencies") { + reader->Read(&binding); + } else { + LOG(FATAL) << "do not support key " << key; + } + } + ICHECK(output_idx >= 0); + output_binding_map[output_idx] = binding; + } + } +}; +/*! + * \brief The binding or dependency information of each module output interface. + */ +struct PipelineConfig { + /*!\brief The key is the module index, this variable records all module pipeline configuration + * information. + */ + std::unordered_map config; + OutputMap& operator[](int key) { + ICHECK(config.find(key) != config.end()); + return config[key]; + } + + void Insert(int key, const OutputMap& map) { config[key] = map; } + + /*!\brief This function is used to verify whether config is loaded successfully. + * \return Return true to indicate that this class has not been successfully loaded. + */ + bool Empty() { return config.empty(); } + + /*! + * \brief Get the number of global outputs. + * \return The number of outputs the entire pipeline has. + */ + size_t GetGlobalOutputNum() const { + size_t num_output = 0; + for (auto mod_output : config) { + num_output += mod_output.second.GetGlobalOutputNum(); + } + return num_output; + } +}; +/*! + * \brief The information used to initialize the graph executor module, the information + * come from the export library function call. + */ +struct GraphModuleLoadInfo { + GraphModuleLoadInfo(const std::string& lib, const std::string& json, const std::string& params, + const std::string& device) + : lib_name(lib), json_name(json), params_name(params), dev(device) {} + GraphModuleLoadInfo() { ; } + std::string lib_name; + std::string json_name; + std::string params_name; + std::string dev; +}; +/*! The Module information of each module.The 'int' is module index. */ +using ModuleConfig = std::unordered_map; +#endif // TVM_RUNTIME_PIPELINE_PIPELINE_STRUCT_H_ diff --git a/tests/python/relay/test_pipeline_executor.py b/tests/python/relay/test_pipeline_executor.py index d9411c92c375..4a9b7eacdf65 100644 --- a/tests/python/relay/test_pipeline_executor.py +++ b/tests/python/relay/test_pipeline_executor.py @@ -16,6 +16,7 @@ # under the License. import pytest +import os import numpy as np import tvm import tvm.testing @@ -76,11 +77,11 @@ def get_manual_conf(mods, target): # The third output is the final output, the second output is for mod3, the first output # is for mod2 input. pipe_config1 = { - "mod_idx": 1, + "mod_idx": 0, "output": [ - {"output_idx": 0, "dependent": [{"mod_idx": 2, "input_name": "data_0"}]}, - {"output_idx": 1, "dependent": [{"mod_idx": 3, "input_name": "data_0"}]}, - {"output_idx": 2, "dependent": [{"mod_idx": 0, "input_name": "0"}]}, + {"output_idx": 0, "dependencies": [{"mod_idx": 1, "input_name": "data_0"}]}, + {"output_idx": 1, "dependencies": [{"mod_idx": 2, "input_name": "data_0"}]}, + {"output_idx": 2, "dependencies": [{"global_output_index": 0}]}, ], } mod_config[mods[0]] = { @@ -94,9 +95,9 @@ def get_manual_conf(mods, target): } pipe_config2 = { - "mod_idx": 2, + "mod_idx": 1, "output": [ - {"output_idx": 0, "dependent": [{"mod_idx": 3, "input_name": "data_1"}]}, + {"output_idx": 0, "dependencies": [{"mod_idx": 2, "input_name": "data_1"}]}, ], } mod_config[mods[1]] = { @@ -110,8 +111,8 @@ def get_manual_conf(mods, target): } pipe_config3 = { - "mod_idx": 3, - "output": [{"output_idx": 0, "dependent": [{"mod_idx": 0, "input_name": "1"}]}], + "mod_idx": 2, + "output": [{"output_idx": 0, "dependencies": [{"global_output_index": 1}]}], } mod_config[mods[2]] = { "pipeline": pipe_config3, @@ -128,7 +129,7 @@ def get_manual_conf(mods, target): def test_pipe_config_check(): # This function is used to trigger runtime error by applying wrong logic connection. - # Get the three pipeline modules here. + # Get three pipeline modules here. (mod1, mod2, mod3), dshape = get_mannual_mod() # The input or output name is illegal and expects a runtime error. @@ -179,10 +180,12 @@ def test_pipeline(): pipe_config = pipeline_executor.PipelineConfig() - # The global input named "data_0" will be connected to a input named "data_0" of mod1. + # The pipeline input named "data_0" will be connected to a input named "data_0" + # of mod1. pipe_config["input"]["data_0"].connect(pipe_config[mod1]["input"]["data_0"]) - # The global Input named "data_1" will be connected to a input named "data_1" of mod2. + # The pipeline Input named "data_1" will be connected to a input named "data_1" + # of mod2. pipe_config["input"]["data_1"].connect(pipe_config[mod2]["input"]["data_1"]) # The mod1 output[0] will be connected to a input named "data_0" of mod2. @@ -194,10 +197,10 @@ def test_pipeline(): # The mod2 output[2] will be connected to a input named "data_1" of mod3. pipe_config[mod2]["output"][0].connect(pipe_config[mod3]["input"]["data_1"]) - # The mod1 output[2] will be connected to global output[1]. + # The mod1 output[2] will be connected to pipeline output[0]. pipe_config[mod1]["output"][2].connect(pipe_config["output"]["0"]) - # The mod3 output[0] will be connected to global output[2]. + # The mod3 output[0] will be connected to pipeline output[1]. pipe_config[mod3]["output"][0].connect(pipe_config["output"]["1"]) # Print configueration (print(pipe_config)), the result looks like following. # @@ -231,9 +234,21 @@ def test_pipeline(): with tvm.transform.PassContext(opt_level=3): pipeline_mod_factory = pipeline_executor.build(pipe_config) + # Export the parameter configuration to a file. + directory_path = tvm.contrib.utils.tempdir().temp_dir + # If the directory does not exist, create it. + if not os.path.exists(directory_path): + os.makedirs(directory_path) + config_file_name = pipeline_mod_factory.export_library(directory_path) + + # Use the output of build to create and initialize PipelineModule. pipeline_module = pipeline_executor.PipelineModule(pipeline_mod_factory) assert pipeline_module + # Use the import function to create and initialize PipelineModule. + pipeline_module_test = pipeline_executor.PipelineModule.load_library(config_file_name) + assert pipeline_module_test.num_outputs == 2 + if __name__ == "__main__": pytest.main([__file__])