From 6720d3593d4dac6015418d4b7e9ad875bbf0b0a2 Mon Sep 17 00:00:00 2001 From: Hua Jiang Date: Mon, 24 Jan 2022 16:40:08 -0800 Subject: [PATCH] [Runtime][PipelineExecutor] Add Pipeline Executor Interface (#10010) Adding interfaces into Pipeline Executor to "run", "stop","set input", and "get input" from the pipeline executor, In this patch, we also implemented the "BackendRuntime" structure to wrap the graph runtime interface in order to support pipeline executor interface and implement data copy method. This method is used to transfer data between two backend runtimes. --- python/tvm/contrib/pipeline_executor.py | 44 +++++++- src/runtime/pipeline/pipeline_executor.cc | 75 ++++++++++++- src/runtime/pipeline/pipeline_executor.h | 31 ++++++ src/runtime/pipeline/pipeline_scheduler.cc | 12 ++- src/runtime/pipeline/pipeline_scheduler.h | 4 +- src/runtime/pipeline/pipeline_struct.h | 105 +++++++++++++++++++ tests/python/relay/test_pipeline_executor.py | 11 +- 7 files changed, 271 insertions(+), 11 deletions(-) diff --git a/python/tvm/contrib/pipeline_executor.py b/python/tvm/contrib/pipeline_executor.py index c75aa3dad43b..6e991f0c8d7a 100644 --- a/python/tvm/contrib/pipeline_executor.py +++ b/python/tvm/contrib/pipeline_executor.py @@ -115,10 +115,22 @@ def __init__(self, module): else: self.module = module # Get the packed functions from the pipeline executor. - self._get_num_outputs = self.module["get_num_outputs"] - self._get_input_pipeline_map = self.module["get_input_pipeline_map"] self._get_params_group_pipeline_map = self.module["get_params_group_pipeline_map"] + self._run = self.module["run"] + self._stop = self.module["stop"] self._set_param = self.module["set_param"] + self._set_input = self.module["set_input"] + self._get_input = self.module["get_input"] + self._get_num_outputs = self.module["get_num_outputs"] + self._get_input_pipeline_map = self.module["get_input_pipeline_map"] + + def run(self, sync=False): + """Run the pipeline executor.""" + self._run(sync) + + def stop(self): + """Stop the pipeline executor.""" + self._stop() def get_input_pipeline_map(self, name): """Using the "name" to get the corresponding subgraph index and also get the "input name" @@ -145,6 +157,21 @@ def get_params_group_pipeline_map(self, name): """ return self._get_params_group_pipeline_map(name) + def set_input(self, key, value): + """Set the input via input name. + + Parameters + ---------- + key : str + The input name + value : array_like. + The input value + """ + v = self._get_input(key) + if v is None: + raise RuntimeError("Could not find '%s' in pipeline's inputs" % key) + v.copyfrom(value) + def set_params(self, params_group_name, params_data): """Set the parameter group value given the parameter group name. Note that the parameter group name is declared in the pipeline executor config. @@ -163,6 +190,19 @@ def set_params(self, params_group_name, params_data): for key, val in params_data.items(): self._set_param(params_group_name, key, val) + def get_input(self, key): + """Get the input via an input name. + Parameters + ---------- + key : str + The input key + Returns + ------- + data : NDArray + The input data. + """ + return self._get_input(key) + @property def num_outputs(self): """Get the number of outputs. diff --git a/src/runtime/pipeline/pipeline_executor.cc b/src/runtime/pipeline/pipeline_executor.cc index 0ca291a2fbbe..30c09514480f 100644 --- a/src/runtime/pipeline/pipeline_executor.cc +++ b/src/runtime/pipeline/pipeline_executor.cc @@ -58,6 +58,26 @@ PackedFunc PipelineExecutor::GetFunction(const std::string& name, LOG(FATAL) << "Function only support the parameter name and the key in the form of string"; } }); + } else if (name == "set_input") { + return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { + if (String::CanConvertFrom(args[0])) { + this->SetInput(args[0].operator String(), args[1]); + } else { + LOG(FATAL) << "Function only support the input name value in the form of string"; + } + }); + } else if (name == "get_input") { + return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { + if (String::CanConvertFrom(args[0])) { + *rv = this->GetInput(args[0].operator String()); + } else { + LOG(FATAL) << "Function only support the input name value in the form of string"; + } + }); + } else if (name == "run") { + return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { this->Run(args[0]); }); + } else if (name == "stop") { + return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { this->Stop(); }); } else { LOG(FATAL) << "Unknown packed function: " << name; return PackedFunc(); @@ -65,6 +85,32 @@ PackedFunc PipelineExecutor::GetFunction(const std::string& name, return nullptr; } +/*! + * \brief set input to the runtime module. + * \param input_name The input name. + * \param data_in The input data. + */ +void PipelineExecutor::SetInput(std::string input_name, DLTensor* data_in) { + std::pair indexs = this->GetInputIndex(input_name); + if (indexs.first < 0 || indexs.first >= static_cast(runtimes_.size())) { + this->Stop(); + LOG(FATAL) << "input name " << input_name << " not found."; + } + runtimes_[indexs.first]->SetInput(indexs.second, data_in); +} +/*! + * \brief get input from the runtime module. + * \param input_name The input name. + * \return Return the input data for a specific input name. + */ +NDArray PipelineExecutor::GetInput(std::string input_name) { + std::pair indexs = this->GetInputIndex(input_name); + if (indexs.first < 0 || indexs.first >= static_cast(runtimes_.size())) { + this->Stop(); + LOG(FATAL) << "input name " << input_name << " not found."; + } + return runtimes_[indexs.first]->GetInput(indexs.second); +} /*! * \brief Using the global input name to get the index, and also get the input interface name of corresponding subgraph from the input connection configuration. @@ -85,6 +131,20 @@ int PipelineExecutor::GetParamsGroupPipelineMap(const std::string& name) { return param_connection_config[name]; } +/*! + * \brief Run the pipeline executor. + * \param serialized_mode Whether run the pipeline executor in serialized mode. + */ +void PipelineExecutor::Run(bool serialized_mode) { + // TODO(huajsj): Run the pipeline executor. +} +/*! + * \brief Stop the pipeline executor. + */ +void PipelineExecutor::Stop() { + // TODO(huajsj): Stop the pipeline executor. +} + /*! * \brief Use the mod_config information to create a graph runtime list. * \param mod_config The config information that generates by the export library function call. @@ -152,6 +212,16 @@ void PipelineExecutor::SetParam(std::string param_group_name, std::string param_ int module_index = this->GetParamsGroupPipelineMap(param_group_name); // TODO(huajsj): set the parameters into runtime module. } +/*! + * \brief Return the input index and module index for a given input name. + * \param name The input name. + * \return std::pair A pair of module index and the input index. + */ +std::pair PipelineExecutor::GetInputIndex(const std::string& name) { + std::pair index = input_connection_config[name]; + auto gruntime = runtimes_[index.first]; + return std::make_pair(index.first, gruntime->GetInputIndex(index.second)); +} /*! * \brief Initialize the pipeline executor with a list of modules to be pipelined * and config in JSON format. @@ -165,9 +235,10 @@ void PipelineExecutor::Init(const std::vector& modules, const std::strin dmlc::JSONReader reader(&is); this->LoadConfig(&reader); ICHECK(!pipeline_config_.Empty()) << "The pipeline config information is empty."; + num_outputs_ = pipeline_config_.GetGlobalOutputNum(); // Initialize the pipeline function class used for pipeline thread pool management - // and schedule etc. This function returns the number of output. - num_outputs_ = pipeline_scheduler_.PipelineInit(modules, pipeline_config_); + // and schedule etc. This function returns a list of runtime. + runtimes_ = pipeline_scheduler_.PipelineInit(modules, pipeline_config_); return; } diff --git a/src/runtime/pipeline/pipeline_executor.h b/src/runtime/pipeline/pipeline_executor.h index 6d4c7ba1fa4f..7dc5baf17ee1 100644 --- a/src/runtime/pipeline/pipeline_executor.h +++ b/src/runtime/pipeline/pipeline_executor.h @@ -29,6 +29,7 @@ #include #include +#include #include #include #include @@ -82,6 +83,18 @@ class TVM_DLL PipelineExecutor : public ModuleNode { * \return Returning a runtime module index. */ int GetParamsGroupPipelineMap(const std::string& name); + /*! + * \brief Use the input name to set the input data of pipeline executor. + * \param input_name The input name. + * \param data_in The input data. + */ + void SetInput(std::string input_name, DLTensor* data_in); + /*! + * \brief Use the input name to get the input data. + * \param input name The input name. + * \return Return input data. + */ + NDArray GetInput(std::string input_name); /*! * \brief Use the parameters group name to get the specific backend runtime then use * the param_key_name to set param data for the said backend runtime. @@ -96,6 +109,22 @@ class TVM_DLL PipelineExecutor : public ModuleNode { * \return The number of outputs. */ int NumOutputs() const { return num_outputs_; } + /*! + * \brief Run the pipeline executor. + * \param serialized_mode Whether run the pipeline executor in serialized mode. + */ + void Run(bool serialized_mode); + /*! + * \brief Stop the pipeline executor. + */ + void Stop(); + /*! + * \brief A pipeline input with a specific name correspond with a input of a specific + * backend module, this function return a module index and a input index in "pair" + * form for a input name. + * return Return a module index and a input index. + */ + std::pair GetInputIndex(const std::string& name); /*!\brief Load the module files information.*/ ModuleConfig& LoadModuleConfig(dmlc::JSONReader* reader) { reader->BeginArray(); @@ -145,6 +174,8 @@ class TVM_DLL PipelineExecutor : public ModuleNode { ModuleConfig mod_config_; /*!\brief How many outputs are in this pipeline executor.*/ size_t num_outputs_ = 0; + /*!The list of backend runtime module.*/ + std::vector> runtimes_; /*!\brief Json loader.*/ void LoadConfig(dmlc::JSONReader* reader) { reader->BeginObject(); diff --git a/src/runtime/pipeline/pipeline_scheduler.cc b/src/runtime/pipeline/pipeline_scheduler.cc index 67a9795c47d4..499d75784a15 100644 --- a/src/runtime/pipeline/pipeline_scheduler.cc +++ b/src/runtime/pipeline/pipeline_scheduler.cc @@ -27,11 +27,15 @@ namespace runtime { * \param modules The list of graph executor modules. * \param pipeline_conf The dependency information of each graph executor module. */ -size_t PipelineScheduler::PipelineInit(const std::vector& modules, - const ConfigPipelineExecution& pipeline_config) { +std::vector> PipelineScheduler::PipelineInit( + const std::vector& modules, const ConfigPipelineExecution& pipeline_config) { + std::vector> runtimes; graph_modules_ = modules; - int num_output = pipeline_config.GetGlobalOutputNum(); - return num_output; + for (size_t i = 0; i < graph_modules_.size(); i++) { + auto runItem = std::make_shared(graph_modules_[i], i); + runtimes.push_back(runItem); + } + return runtimes; } } // namespace runtime } // namespace tvm diff --git a/src/runtime/pipeline/pipeline_scheduler.h b/src/runtime/pipeline/pipeline_scheduler.h index 0572e060a1b8..02c44420bd51 100644 --- a/src/runtime/pipeline/pipeline_scheduler.h +++ b/src/runtime/pipeline/pipeline_scheduler.h @@ -41,8 +41,8 @@ class PipelineScheduler { * \param modules The list of graph executor module. * \param pipeline_config The dependency information of each graph executor module. */ - size_t PipelineInit(const std::vector& modules, - const ConfigPipelineExecution& pipeline_config); + std::vector> PipelineInit( + const std::vector& modules, const ConfigPipelineExecution& pipeline_config); private: /*!\brief The list of graph executors.*/ diff --git a/src/runtime/pipeline/pipeline_struct.h b/src/runtime/pipeline/pipeline_struct.h index aa831070ccdb..40628e989a90 100644 --- a/src/runtime/pipeline/pipeline_struct.h +++ b/src/runtime/pipeline/pipeline_struct.h @@ -21,12 +21,16 @@ #include #include #include +#include +#include #include #include #include #include #include +namespace tvm { +namespace runtime { /*! * \brief All binding information of a output interface. */ @@ -292,7 +296,106 @@ struct ParamConnectionConfig { } } }; +/* + *\brief Backend Runtime. + */ +class BackendRuntime { + private: + /*\brief The index of runtime indicates the runtime position in the pipeline.*/ + int runtime_idx_; + /*\brief The Runtime module of a backend graph executor.*/ + Module module_; + /*! + *\brief In order to transfer data from one backend runtime to another, we need a local + * tensor variable as a medium. "input_tensor_local_copy_" is a map including + * input data and local tensor vairable. + */ + std::unordered_map input_tensor_local_copy_; + /*!\brief The packed functions.*/ + tvm::runtime::PackedFunc set_input_; + tvm::runtime::PackedFunc get_input_; + tvm::runtime::PackedFunc get_num_output_; + tvm::runtime::PackedFunc get_num_inputs_; + tvm::runtime::PackedFunc get_input_index_; + /*! + * \brief Copying from a given tensor and using 'CPU' as the device. + */ + inline DLTensor* CopyDLTensorToCPU(const DLTensor* from) { + DLTensor* ret = NULL; + TVMArrayAlloc(from->shape, from->ndim, from->dtype.code, from->dtype.bits, from->dtype.lanes, + kDLCPU, 0, &ret); + return ret; + } + /*!\brief Creating a new NDArray with same shape and data type as the given DLTensor.*/ + NDArray CreateNDArrayFromDLTensor(const DLTensor* from) { + std::vector shape; + for (int i = 0; i < from->ndim; i++) { + shape.push_back(from->shape[i]); + } + auto ndarray = NDArray::Empty(shape, from->dtype, from->device); + ndarray.CreateView(shape, from->dtype); + return ndarray; + } + /* + *\brief Copying data from one DLTensor to another. + */ + void CopyFromTo(DLTensor* from, DLTensor* to) { + // When the 'from' device and the 'to' device are not the same, we use a temporary CPU + // DLTensor as the bridge. + if (from->device.device_type != to->device.device_type && from->device.device_type != kDLCPU && + to->device.device_type != kDLCPU) { + DLTensor* dltensor_local = nullptr; + if (input_tensor_local_copy_.find(to) == input_tensor_local_copy_.end()) { + dltensor_local = CopyDLTensorToCPU(from); + input_tensor_local_copy_[to] = dltensor_local; + } else { + dltensor_local = input_tensor_local_copy_[to]; + } + TVMArrayCopyFromTo(from, dltensor_local, nullptr); + from = dltensor_local; + } + TVMArrayCopyFromTo(from, to, nullptr); + } + + public: + BackendRuntime(Module mod, int mod_idx) { + module_ = mod; + runtime_idx_ = mod_idx; + get_input_index_ = module_.GetFunction("get_input_index"); + get_num_output_ = module_.GetFunction("get_num_outputs"); + get_num_inputs_ = module_.GetFunction("get_num_inputs"); + set_input_ = module_.GetFunction("set_input"); + get_input_ = module_.GetFunction("get_input"); + } + BackendRuntime(void) {} + ~BackendRuntime() { + for (auto data : input_tensor_local_copy_) { + TVMArrayFree(data.second); + } + } + /*!\brief Return the index of the current module.*/ + int GetModuleIndex() { return runtime_idx_; } + /*!\brief Return the number of output*/ + int NumOutputs() const { return get_num_output_(); } + /*!\brief Return the number of input*/ + int NumInputs() const { return get_num_inputs_(); } + /*!\brief Setting the data to this module via input index.*/ + void SetInput(const int index, DLTensor* data_in) { + NDArray input = get_input_(index); + DLTensor* dltensor_input = const_cast(input.operator->()); + CopyFromTo(data_in, dltensor_input); + } + /*!\brief Setting the data to the current runtime moduel via the input name. */ + void SetInput(const std::string name, DLTensor* data_in) { + int index = this->GetInputIndex(name); + SetInput(index, data_in); + } + /*!\brief Getting the input data via the input index.*/ + NDArray GetInput(int index) const { return get_input_(index); } + /*!\bief Getting the input data via the input name.*/ + int GetInputIndex(const std::string& name) { return get_input_index_(name); } +}; /*! * \brief The information used to initialize the graph executor module, the information * come from the export library function call. @@ -309,4 +412,6 @@ struct GraphModuleLoadInfo { }; /*! The Module information of each module.The 'int' is module index. */ using ModuleConfig = std::unordered_map; +}; // namespace runtime +}; // namespace tvm #endif // TVM_RUNTIME_PIPELINE_PIPELINE_STRUCT_H_ diff --git a/tests/python/relay/test_pipeline_executor.py b/tests/python/relay/test_pipeline_executor.py index 83cf237dbfcc..99c24ef93b80 100644 --- a/tests/python/relay/test_pipeline_executor.py +++ b/tests/python/relay/test_pipeline_executor.py @@ -292,8 +292,17 @@ def test_pipeline(): assert input_map[0] == "0" and input_map[1] == "data_0" module_index = pipeline_module_test.get_params_group_pipeline_map("param_0") assert module_index == 1 - # Use the parameters group name to set parameters. + # Using the parameters group name to set parameters. pipeline_module_test.set_params("param_0", customized_parameters) + # Getting the result from the pipeline executor + data_a = np.full(dshape, 1).astype("float32") + data_b = np.full(dshape, 2).astype("float32") + pipeline_module_test.set_input("data_a", data_a) + pipeline_module_test.set_input("data_b", data_b) + input_data = pipeline_module_test.get_input("data_b") + tvm.testing.assert_allclose(data_b, input_data.numpy()) + input_data = pipeline_module_test.get_input("data_a") + tvm.testing.assert_allclose(data_a, input_data.numpy()) if __name__ == "__main__":