Skip to content

Commit

Permalink
[Runtime][PipelineExecutor] Add Pipeline Executor Interface (#10010)
Browse files Browse the repository at this point in the history
Adding interfaces into Pipeline Executor to "run", "stop","set input",
and "get input" from the pipeline executor,

In this patch, we also implemented the "BackendRuntime" structure to
wrap the graph runtime interface in order to support  pipeline executor
interface and implement data copy method. This method is used to
transfer data between two backend runtimes.
  • Loading branch information
huajsj authored Jan 25, 2022
1 parent 73bbfbb commit 6720d35
Show file tree
Hide file tree
Showing 7 changed files with 271 additions and 11 deletions.
44 changes: 42 additions & 2 deletions python/tvm/contrib/pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,22 @@ def __init__(self, module):
else:
self.module = module
# Get the packed functions from the pipeline executor.
self._get_num_outputs = self.module["get_num_outputs"]
self._get_input_pipeline_map = self.module["get_input_pipeline_map"]
self._get_params_group_pipeline_map = self.module["get_params_group_pipeline_map"]
self._run = self.module["run"]
self._stop = self.module["stop"]
self._set_param = self.module["set_param"]
self._set_input = self.module["set_input"]
self._get_input = self.module["get_input"]
self._get_num_outputs = self.module["get_num_outputs"]
self._get_input_pipeline_map = self.module["get_input_pipeline_map"]

def run(self, sync=False):
"""Run the pipeline executor."""
self._run(sync)

def stop(self):
"""Stop the pipeline executor."""
self._stop()

def get_input_pipeline_map(self, name):
"""Using the "name" to get the corresponding subgraph index and also get the "input name"
Expand All @@ -145,6 +157,21 @@ def get_params_group_pipeline_map(self, name):
"""
return self._get_params_group_pipeline_map(name)

def set_input(self, key, value):
"""Set the input via input name.
Parameters
----------
key : str
The input name
value : array_like.
The input value
"""
v = self._get_input(key)
if v is None:
raise RuntimeError("Could not find '%s' in pipeline's inputs" % key)
v.copyfrom(value)

def set_params(self, params_group_name, params_data):
"""Set the parameter group value given the parameter group name. Note that the parameter
group name is declared in the pipeline executor config.
Expand All @@ -163,6 +190,19 @@ def set_params(self, params_group_name, params_data):
for key, val in params_data.items():
self._set_param(params_group_name, key, val)

def get_input(self, key):
"""Get the input via an input name.
Parameters
----------
key : str
The input key
Returns
-------
data : NDArray
The input data.
"""
return self._get_input(key)

@property
def num_outputs(self):
"""Get the number of outputs.
Expand Down
75 changes: 73 additions & 2 deletions src/runtime/pipeline/pipeline_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,59 @@ PackedFunc PipelineExecutor::GetFunction(const std::string& name,
LOG(FATAL) << "Function only support the parameter name and the key in the form of string";
}
});
} else if (name == "set_input") {
return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) {
if (String::CanConvertFrom(args[0])) {
this->SetInput(args[0].operator String(), args[1]);
} else {
LOG(FATAL) << "Function only support the input name value in the form of string";
}
});
} else if (name == "get_input") {
return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) {
if (String::CanConvertFrom(args[0])) {
*rv = this->GetInput(args[0].operator String());
} else {
LOG(FATAL) << "Function only support the input name value in the form of string";
}
});
} else if (name == "run") {
return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { this->Run(args[0]); });
} else if (name == "stop") {
return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { this->Stop(); });
} else {
LOG(FATAL) << "Unknown packed function: " << name;
return PackedFunc();
}
return nullptr;
}

/*!
* \brief set input to the runtime module.
* \param input_name The input name.
* \param data_in The input data.
*/
void PipelineExecutor::SetInput(std::string input_name, DLTensor* data_in) {
std::pair<int, int> indexs = this->GetInputIndex(input_name);
if (indexs.first < 0 || indexs.first >= static_cast<int>(runtimes_.size())) {
this->Stop();
LOG(FATAL) << "input name " << input_name << " not found.";
}
runtimes_[indexs.first]->SetInput(indexs.second, data_in);
}
/*!
* \brief get input from the runtime module.
* \param input_name The input name.
* \return Return the input data for a specific input name.
*/
NDArray PipelineExecutor::GetInput(std::string input_name) {
std::pair<int, int> indexs = this->GetInputIndex(input_name);
if (indexs.first < 0 || indexs.first >= static_cast<int>(runtimes_.size())) {
this->Stop();
LOG(FATAL) << "input name " << input_name << " not found.";
}
return runtimes_[indexs.first]->GetInput(indexs.second);
}
/*!
* \brief Using the global input name to get the index, and also get the input interface name
of corresponding subgraph from the input connection configuration.
Expand All @@ -85,6 +131,20 @@ int PipelineExecutor::GetParamsGroupPipelineMap(const std::string& name) {
return param_connection_config[name];
}

/*!
* \brief Run the pipeline executor.
* \param serialized_mode Whether run the pipeline executor in serialized mode.
*/
void PipelineExecutor::Run(bool serialized_mode) {
// TODO(huajsj): Run the pipeline executor.
}
/*!
* \brief Stop the pipeline executor.
*/
void PipelineExecutor::Stop() {
// TODO(huajsj): Stop the pipeline executor.
}

/*!
* \brief Use the mod_config information to create a graph runtime list.
* \param mod_config The config information that generates by the export library function call.
Expand Down Expand Up @@ -152,6 +212,16 @@ void PipelineExecutor::SetParam(std::string param_group_name, std::string param_
int module_index = this->GetParamsGroupPipelineMap(param_group_name);
// TODO(huajsj): set the parameters into runtime module.
}
/*!
* \brief Return the input index and module index for a given input name.
* \param name The input name.
* \return std::pair<int, int> A pair of module index and the input index.
*/
std::pair<int, int> PipelineExecutor::GetInputIndex(const std::string& name) {
std::pair<int, std::string> index = input_connection_config[name];
auto gruntime = runtimes_[index.first];
return std::make_pair(index.first, gruntime->GetInputIndex(index.second));
}
/*!
* \brief Initialize the pipeline executor with a list of modules to be pipelined
* and config in JSON format.
Expand All @@ -165,9 +235,10 @@ void PipelineExecutor::Init(const std::vector<Module>& modules, const std::strin
dmlc::JSONReader reader(&is);
this->LoadConfig(&reader);
ICHECK(!pipeline_config_.Empty()) << "The pipeline config information is empty.";
num_outputs_ = pipeline_config_.GetGlobalOutputNum();
// Initialize the pipeline function class used for pipeline thread pool management
// and schedule etc. This function returns the number of output.
num_outputs_ = pipeline_scheduler_.PipelineInit(modules, pipeline_config_);
// and schedule etc. This function returns a list of runtime.
runtimes_ = pipeline_scheduler_.PipelineInit(modules, pipeline_config_);
return;
}

Expand Down
31 changes: 31 additions & 0 deletions src/runtime/pipeline/pipeline_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include <array>
#include <iostream>
#include <memory>
#include <sstream>
#include <string>
#include <utility>
Expand Down Expand Up @@ -82,6 +83,18 @@ class TVM_DLL PipelineExecutor : public ModuleNode {
* \return Returning a runtime module index.
*/
int GetParamsGroupPipelineMap(const std::string& name);
/*!
* \brief Use the input name to set the input data of pipeline executor.
* \param input_name The input name.
* \param data_in The input data.
*/
void SetInput(std::string input_name, DLTensor* data_in);
/*!
* \brief Use the input name to get the input data.
* \param input name The input name.
* \return Return input data.
*/
NDArray GetInput(std::string input_name);
/*!
* \brief Use the parameters group name to get the specific backend runtime then use
* the param_key_name to set param data for the said backend runtime.
Expand All @@ -96,6 +109,22 @@ class TVM_DLL PipelineExecutor : public ModuleNode {
* \return The number of outputs.
*/
int NumOutputs() const { return num_outputs_; }
/*!
* \brief Run the pipeline executor.
* \param serialized_mode Whether run the pipeline executor in serialized mode.
*/
void Run(bool serialized_mode);
/*!
* \brief Stop the pipeline executor.
*/
void Stop();
/*!
* \brief A pipeline input with a specific name correspond with a input of a specific
* backend module, this function return a module index and a input index in "pair"
* form for a input name.
* return Return a module index and a input index.
*/
std::pair<int, int> GetInputIndex(const std::string& name);
/*!\brief Load the module files information.*/
ModuleConfig& LoadModuleConfig(dmlc::JSONReader* reader) {
reader->BeginArray();
Expand Down Expand Up @@ -145,6 +174,8 @@ class TVM_DLL PipelineExecutor : public ModuleNode {
ModuleConfig mod_config_;
/*!\brief How many outputs are in this pipeline executor.*/
size_t num_outputs_ = 0;
/*!The list of backend runtime module.*/
std::vector<std::shared_ptr<BackendRuntime>> runtimes_;
/*!\brief Json loader.*/
void LoadConfig(dmlc::JSONReader* reader) {
reader->BeginObject();
Expand Down
12 changes: 8 additions & 4 deletions src/runtime/pipeline/pipeline_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,15 @@ namespace runtime {
* \param modules The list of graph executor modules.
* \param pipeline_conf The dependency information of each graph executor module.
*/
size_t PipelineScheduler::PipelineInit(const std::vector<Module>& modules,
const ConfigPipelineExecution& pipeline_config) {
std::vector<std::shared_ptr<BackendRuntime>> PipelineScheduler::PipelineInit(
const std::vector<Module>& modules, const ConfigPipelineExecution& pipeline_config) {
std::vector<std::shared_ptr<BackendRuntime>> runtimes;
graph_modules_ = modules;
int num_output = pipeline_config.GetGlobalOutputNum();
return num_output;
for (size_t i = 0; i < graph_modules_.size(); i++) {
auto runItem = std::make_shared<BackendRuntime>(graph_modules_[i], i);
runtimes.push_back(runItem);
}
return runtimes;
}
} // namespace runtime
} // namespace tvm
4 changes: 2 additions & 2 deletions src/runtime/pipeline/pipeline_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ class PipelineScheduler {
* \param modules The list of graph executor module.
* \param pipeline_config The dependency information of each graph executor module.
*/
size_t PipelineInit(const std::vector<Module>& modules,
const ConfigPipelineExecution& pipeline_config);
std::vector<std::shared_ptr<BackendRuntime>> PipelineInit(
const std::vector<Module>& modules, const ConfigPipelineExecution& pipeline_config);

private:
/*!\brief The list of graph executors.*/
Expand Down
105 changes: 105 additions & 0 deletions src/runtime/pipeline/pipeline_struct.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@
#include <assert.h>
#include <dlpack/dlpack.h>
#include <dmlc/json.h>
#include <tvm/runtime/ndarray.h>
#include <tvm/runtime/packed_func.h>

#include <limits>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
namespace tvm {
namespace runtime {
/*!
* \brief All binding information of a output interface.
*/
Expand Down Expand Up @@ -292,7 +296,106 @@ struct ParamConnectionConfig {
}
}
};
/*
*\brief Backend Runtime.
*/
class BackendRuntime {
private:
/*\brief The index of runtime indicates the runtime position in the pipeline.*/
int runtime_idx_;
/*\brief The Runtime module of a backend graph executor.*/
Module module_;
/*!
*\brief In order to transfer data from one backend runtime to another, we need a local
* tensor variable as a medium. "input_tensor_local_copy_" is a map including
* input data and local tensor vairable.
*/
std::unordered_map<DLTensor*, DLTensor*> input_tensor_local_copy_;
/*!\brief The packed functions.*/
tvm::runtime::PackedFunc set_input_;
tvm::runtime::PackedFunc get_input_;
tvm::runtime::PackedFunc get_num_output_;
tvm::runtime::PackedFunc get_num_inputs_;
tvm::runtime::PackedFunc get_input_index_;
/*!
* \brief Copying from a given tensor and using 'CPU' as the device.
*/
inline DLTensor* CopyDLTensorToCPU(const DLTensor* from) {
DLTensor* ret = NULL;
TVMArrayAlloc(from->shape, from->ndim, from->dtype.code, from->dtype.bits, from->dtype.lanes,
kDLCPU, 0, &ret);
return ret;
}
/*!\brief Creating a new NDArray with same shape and data type as the given DLTensor.*/
NDArray CreateNDArrayFromDLTensor(const DLTensor* from) {
std::vector<int64_t> shape;
for (int i = 0; i < from->ndim; i++) {
shape.push_back(from->shape[i]);
}
auto ndarray = NDArray::Empty(shape, from->dtype, from->device);
ndarray.CreateView(shape, from->dtype);
return ndarray;
}
/*
*\brief Copying data from one DLTensor to another.
*/
void CopyFromTo(DLTensor* from, DLTensor* to) {
// When the 'from' device and the 'to' device are not the same, we use a temporary CPU
// DLTensor as the bridge.
if (from->device.device_type != to->device.device_type && from->device.device_type != kDLCPU &&
to->device.device_type != kDLCPU) {
DLTensor* dltensor_local = nullptr;
if (input_tensor_local_copy_.find(to) == input_tensor_local_copy_.end()) {
dltensor_local = CopyDLTensorToCPU(from);
input_tensor_local_copy_[to] = dltensor_local;
} else {
dltensor_local = input_tensor_local_copy_[to];
}
TVMArrayCopyFromTo(from, dltensor_local, nullptr);
from = dltensor_local;
}

TVMArrayCopyFromTo(from, to, nullptr);
}

public:
BackendRuntime(Module mod, int mod_idx) {
module_ = mod;
runtime_idx_ = mod_idx;
get_input_index_ = module_.GetFunction("get_input_index");
get_num_output_ = module_.GetFunction("get_num_outputs");
get_num_inputs_ = module_.GetFunction("get_num_inputs");
set_input_ = module_.GetFunction("set_input");
get_input_ = module_.GetFunction("get_input");
}
BackendRuntime(void) {}
~BackendRuntime() {
for (auto data : input_tensor_local_copy_) {
TVMArrayFree(data.second);
}
}
/*!\brief Return the index of the current module.*/
int GetModuleIndex() { return runtime_idx_; }
/*!\brief Return the number of output*/
int NumOutputs() const { return get_num_output_(); }
/*!\brief Return the number of input*/
int NumInputs() const { return get_num_inputs_(); }
/*!\brief Setting the data to this module via input index.*/
void SetInput(const int index, DLTensor* data_in) {
NDArray input = get_input_(index);
DLTensor* dltensor_input = const_cast<DLTensor*>(input.operator->());
CopyFromTo(data_in, dltensor_input);
}
/*!\brief Setting the data to the current runtime moduel via the input name. */
void SetInput(const std::string name, DLTensor* data_in) {
int index = this->GetInputIndex(name);
SetInput(index, data_in);
}
/*!\brief Getting the input data via the input index.*/
NDArray GetInput(int index) const { return get_input_(index); }
/*!\bief Getting the input data via the input name.*/
int GetInputIndex(const std::string& name) { return get_input_index_(name); }
};
/*!
* \brief The information used to initialize the graph executor module, the information
* come from the export library function call.
Expand All @@ -309,4 +412,6 @@ struct GraphModuleLoadInfo {
};
/*! The Module information of each module.The 'int' is module index. */
using ModuleConfig = std::unordered_map<int, GraphModuleLoadInfo>;
}; // namespace runtime
}; // namespace tvm
#endif // TVM_RUNTIME_PIPELINE_PIPELINE_STRUCT_H_
Loading

0 comments on commit 6720d35

Please sign in to comment.