From 243a8fd907610865df64e9179bfcc31c0ed3f20e Mon Sep 17 00:00:00 2001 From: Chen Yufei Date: Thu, 24 Dec 2020 20:08:55 +0800 Subject: [PATCH 1/3] Support multiple train data on single machine. --- docs/Parameters.rst | 4 +- include/LightGBM/config.h | 6 +- include/LightGBM/dataset.h | 2 +- include/LightGBM/dataset_loader.h | 30 ++++-- include/LightGBM/utils/pipeline_reader.h | 1 + include/LightGBM/utils/text_reader.h | 132 ++++++++++++++++------- src/application/application.cpp | 29 +++-- src/io/config.cpp | 8 +- src/io/config_auto.cpp | 6 +- src/io/dataset.cpp | 8 +- src/io/dataset_loader.cpp | 50 +++++---- 11 files changed, 188 insertions(+), 88 deletions(-) diff --git a/docs/Parameters.rst b/docs/Parameters.rst index 9b6688db4337..1b9e087f4a6e 100644 --- a/docs/Parameters.rst +++ b/docs/Parameters.rst @@ -141,7 +141,9 @@ Core Parameters - ``data`` :raw-html:`🔗︎`, default = ``""``, type = string, aliases: ``train``, ``train_data``, ``train_data_file``, ``data_filename`` - - path of training data, LightGBM will train from this data + - path of training data, LightGBM will train from these data + + - support multiple train data, separated by ``,`` - **Note**: can be used only in CLI version diff --git a/include/LightGBM/config.h b/include/LightGBM/config.h index 6fccf7b39764..7c0c6faafe6d 100644 --- a/include/LightGBM/config.h +++ b/include/LightGBM/config.h @@ -161,9 +161,11 @@ struct Config { bool linear_tree = false; // alias = train, train_data, train_data_file, data_filename - // desc = path of training data, LightGBM will train from this data + // default = "" + // desc = path of training data, LightGBM will train from these data + // desc = support multiple train data, separated by ``,`` // desc = **Note**: can be used only in CLI version - std::string data = ""; + std::vector data; // alias = test, valid_data, valid_data_file, test_data, test_data_file, valid_filenames // default = "" diff --git a/include/LightGBM/dataset.h b/include/LightGBM/dataset.h index 90f48e70c744..f55c101f79ca 100644 --- a/include/LightGBM/dataset.h +++ b/include/LightGBM/dataset.h @@ -679,7 +679,7 @@ class Dataset { } private: - std::string data_filename_; + std::vector data_filename_; /*! \brief Store used features */ std::vector> feature_groups_; /*! \brief Mapper from real feature index to used index*/ diff --git a/include/LightGBM/dataset_loader.h b/include/LightGBM/dataset_loader.h index e72dd4910804..a7ac737efa38 100644 --- a/include/LightGBM/dataset_loader.h +++ b/include/LightGBM/dataset_loader.h @@ -15,11 +15,17 @@ namespace LightGBM { class DatasetLoader { public: + LIGHTGBM_EXPORT DatasetLoader(const Config& io_config, const PredictFunction& predict_fun, int num_class, const std::vector& filenames); + LIGHTGBM_EXPORT DatasetLoader(const Config& io_config, const PredictFunction& predict_fun, int num_class, const char* filename); LIGHTGBM_EXPORT ~DatasetLoader(); - LIGHTGBM_EXPORT Dataset* LoadFromFile(const char* filename, int rank, int num_machines); + LIGHTGBM_EXPORT Dataset* LoadFromFile(const std::vector& filenames, int rank, int num_machines); + + LIGHTGBM_EXPORT Dataset* LoadFromFile(const char* filename, int rank, int num_machines) { + return LoadFromFile(std::vector{filename}, rank, num_machines); + } LIGHTGBM_EXPORT Dataset* LoadFromFile(const char* filename) { return LoadFromFile(filename, 0, 1); @@ -40,17 +46,25 @@ class DatasetLoader { const std::unordered_set& categorical_features); private: - Dataset* LoadFromBinFile(const char* data_filename, const char* bin_filename, int rank, int num_machines, int* num_global_data, std::vector* used_data_indices); + Dataset* LoadFromBinFile(const std::vector& data_filename, const char* bin_filename, int rank, int num_machines, int* num_global_data, std::vector* used_data_indices); - void SetHeader(const char* filename); + void SetHeader(const char* filenames); void CheckDataset(const Dataset* dataset, bool is_load_from_binary); - std::vector LoadTextDataToMemory(const char* filename, const Metadata& metadata, int rank, int num_machines, int* num_global_data, std::vector* used_data_indices); + std::vector LoadTextDataToMemory(const std::vector& filename, const Metadata& metadata, int rank, int num_machines, int* num_global_data, std::vector* used_data_indices); + + std::vector LoadTextDataToMemory(const char* filename, const Metadata& metadata, int rank, int num_machines, int* num_global_data, std::vector* used_data_indices) { + return LoadTextDataToMemory(std::vector{filename}, metadata, rank, num_machines, num_global_data, used_data_indices); + } std::vector SampleTextDataFromMemory(const std::vector& data); - std::vector SampleTextDataFromFile(const char* filename, const Metadata& metadata, int rank, int num_machines, int* num_global_data, std::vector* used_data_indices); + std::vector SampleTextDataFromFile(const std::vector& filenames, const Metadata& metadata, int rank, int num_machines, int* num_global_data, std::vector* used_data_indices); + + std::vector SampleTextDataFromFile(const char* filename, const Metadata& metadata, int rank, int num_machines, int* num_global_data, std::vector* used_data_indices) { + return SampleTextDataFromFile(std::vector{filename}, metadata, rank, num_machines, num_global_data, used_data_indices); + } void ConstructBinMappersFromTextData(int rank, int num_machines, const std::vector& sample_data, const Parser* parser, Dataset* dataset); @@ -58,7 +72,11 @@ class DatasetLoader { void ExtractFeaturesFromMemory(std::vector* text_data, const Parser* parser, Dataset* dataset); /*! \brief Extract local features from file */ - void ExtractFeaturesFromFile(const char* filename, const Parser* parser, const std::vector& used_data_indices, Dataset* dataset); + void ExtractFeaturesFromFile(const std::vector& filenames, const Parser* parser, const std::vector& used_data_indices, Dataset* dataset); + + void ExtractFeaturesFromFile(const char* filename, const Parser* parser, const std::vector& used_data_indices, Dataset* dataset) { + ExtractFeaturesFromFile(std::vector{filename}, parser, used_data_indices, dataset); + } /*! \brief Check can load from binary file */ std::string CheckCanLoadFromBin(const char* filename); diff --git a/include/LightGBM/utils/pipeline_reader.h b/include/LightGBM/utils/pipeline_reader.h index f02500c9751a..0526f9c5ffea 100644 --- a/include/LightGBM/utils/pipeline_reader.h +++ b/include/LightGBM/utils/pipeline_reader.h @@ -43,6 +43,7 @@ class PipelineReader { if (skip_bytes > 0) { // skip first k bytes read_cnt = reader->Read(buffer_process.data(), skip_bytes); + Log::Debug("Skipped header \"%s\" in file %s", std::string(buffer_process.data(), read_cnt).c_str(), filename); } // read first block read_cnt = reader->Read(buffer_process.data(), buffer_size); diff --git a/include/LightGBM/utils/text_reader.h b/include/LightGBM/utils/text_reader.h index 638bb2683627..3c2dae36ad94 100644 --- a/include/LightGBM/utils/text_reader.h +++ b/include/LightGBM/utils/text_reader.h @@ -30,36 +30,25 @@ class TextReader { * \param filename Filename of data * \param is_skip_first_line True if need to skip header */ - TextReader(const char* filename, bool is_skip_first_line, size_t progress_interval_bytes = SIZE_MAX): - filename_(filename), is_skip_first_line_(is_skip_first_line), read_progress_interval_bytes_(progress_interval_bytes) { - if (is_skip_first_line_) { - auto reader = VirtualFileReader::Make(filename); - if (!reader->Init()) { - Log::Fatal("Could not open %s", filename); - } - std::stringstream str_buf; - char read_c; - size_t nread = reader->Read(&read_c, 1); - while (nread == 1) { - if (read_c == '\n' || read_c == '\r') { - break; + TextReader(const std::vector& filenames, bool is_skip_first_line, size_t progress_interval_bytes = SIZE_MAX): + filename_(filenames), is_skip_first_line_(is_skip_first_line), read_progress_interval_bytes_(progress_interval_bytes) { + if (is_skip_first_line) { + first_line_ = CountFirstLineBytes(filename_[0], &skip_bytes_); + // Check if every file have same length of the first line. + for (size_t i = 1; i < filenames.size(); ++i) { + int n_bytes = 0; + CountFirstLineBytes(filename_[i], &n_bytes); + if (n_bytes != skip_bytes_) { + Log::Fatal("file %s first line has %d bytes, not equal to first file which have %d bytes", + filename_[i], n_bytes, skip_bytes_); } - str_buf << read_c; - ++skip_bytes_; - nread = reader->Read(&read_c, 1); - } - if (read_c == '\r') { - reader->Read(&read_c, 1); - ++skip_bytes_; } - if (read_c == '\n') { - reader->Read(&read_c, 1); - ++skip_bytes_; - } - first_line_ = str_buf.str(); - Log::Debug("Skipped header \"%s\" in file %s", first_line_.c_str(), filename_); } } + + TextReader(const char* filename, bool is_skip_first_line, size_t progress_interval_bytes = SIZE_MAX): + TextReader(std::vector{filename}, is_skip_first_line, progress_interval_bytes) { + } /*! * \brief Destructor */ @@ -67,6 +56,35 @@ class TextReader { Clear(); } /*! + * \brief Count number of bytes for the first line of a file. + */ + std::string CountFirstLineBytes(const char* filename, int* n_bytes) { + auto reader = VirtualFileReader::Make(filename); + if (!reader->Init()) { + Log::Fatal("Could not open %s", filename); + } + std::stringstream str_buf; + char read_c; + size_t nread = reader->Read(&read_c, 1); + while (nread == 1) { + if (read_c == '\n' || read_c == '\r') { + break; + } + str_buf << read_c; + ++(*n_bytes); + nread = reader->Read(&read_c, 1); + } + if (read_c == '\r') { + reader->Read(&read_c, 1); + ++(*n_bytes); + } + if (read_c == '\n') { + reader->Read(&read_c, 1); + ++(*n_bytes); + } + return str_buf.str(); + } + /*! * \brief Clear cached data */ inline void Clear() { @@ -85,11 +103,12 @@ class TextReader { */ inline std::vector& Lines() { return lines_; } - INDEX_T ReadAllAndProcess(const std::function& process_fun) { + INDEX_T ReadAllAndProcessFile(const char* filename, const std::function& process_fun, INDEX_T& total_cnt) { + Log::Debug("ReadAllAndProcessFile %s", filename); last_line_ = ""; - INDEX_T total_cnt = 0; size_t bytes_read = 0; - PipelineReader::Read(filename_, skip_bytes_, + + PipelineReader::Read(filename, skip_bytes_, [&process_fun, &bytes_read, &total_cnt, this] (const char* buffer_process, size_t read_cnt) { size_t cnt = 0; @@ -138,6 +157,15 @@ class TextReader { ++total_cnt; last_line_ = ""; } + Log::Debug("ReadAllAndProcessFile total_cnt %lu", total_cnt); + return total_cnt; + } + + INDEX_T ReadAllAndProcess(const std::function& process_fun) { + INDEX_T total_cnt = 0; + for (const auto& it : filename_) { + ReadAllAndProcessFile(it, process_fun, total_cnt); + } return total_cnt; } @@ -152,21 +180,33 @@ class TextReader { }); } - std::vector ReadContent(size_t* out_len) { - std::vector ret; + bool ReadContentFromFile(const char* filename, size_t* out_len, std::vector* ret) { *out_len = 0; - auto reader = VirtualFileReader::Make(filename_); + auto reader = VirtualFileReader::Make(filename); if (!reader->Init()) { - return ret; + return false; } const size_t buffer_size = 16 * 1024 * 1024; auto buffer_read = std::vector(buffer_size); size_t read_cnt = 0; do { read_cnt = reader->Read(buffer_read.data(), buffer_size); - ret.insert(ret.end(), buffer_read.begin(), buffer_read.begin() + read_cnt); + ret->insert(ret->end(), buffer_read.begin(), buffer_read.begin() + read_cnt); *out_len += read_cnt; } while (read_cnt > 0); + return true; + } + + std::vector ReadContent(size_t* out_len) { + std::vector ret; + for (const auto& it : filename_) { + size_t read_cnt = 0; + Log::Debug("ReadContentFromFile %s", it); + if (!ReadContentFromFile(it, &read_cnt, &ret)) { + break; + } + *out_len += read_cnt; + } return ret; } @@ -235,12 +275,13 @@ class TextReader { }); } - INDEX_T ReadAllAndProcessParallelWithFilter(const std::function&)>& process_fun, const std::function& filter_fun) { + INDEX_T ReadAllAndProcessParallelWithFilterOne(const char* filename, + const std::function&)>& process_fun, + const std::function& filter_fun, + INDEX_T& total_cnt, size_t& bytes_read, INDEX_T& used_cnt) { + Log::Debug("ReadAllAndProcessParallelWithFilterOne %s", filename); last_line_ = ""; - INDEX_T total_cnt = 0; - size_t bytes_read = 0; - INDEX_T used_cnt = 0; - PipelineReader::Read(filename_, skip_bytes_, + PipelineReader::Read(filename, skip_bytes_, [&process_fun, &filter_fun, &total_cnt, &bytes_read, &used_cnt, this] (const char* buffer_process, size_t read_cnt) { size_t cnt = 0; @@ -306,6 +347,17 @@ class TextReader { return total_cnt; } + INDEX_T ReadAllAndProcessParallelWithFilter(const std::function&)>& process_fun, const std::function& filter_fun) { + INDEX_T total_cnt = 0; + size_t bytes_read = 0; + INDEX_T used_cnt = 0; + for (const auto& it : filename_) { + ReadAllAndProcessParallelWithFilterOne(it, process_fun, filter_fun, total_cnt, bytes_read, used_cnt); + } + Log::Debug("ReadAllAndProcessParallelWithFilter total_cnt %lu", total_cnt); + return total_cnt; + } + INDEX_T ReadAllAndProcessParallel(const std::function&)>& process_fun) { return ReadAllAndProcessParallelWithFilter(process_fun, [](INDEX_T, INDEX_T) { return true; }); } @@ -323,7 +375,7 @@ class TextReader { private: /*! \brief Filename of text data */ - const char* filename_; + std::vector filename_; /*! \brief Cache the read text data */ std::vector lines_; /*! \brief Buffer for last line */ diff --git a/src/application/application.cpp b/src/application/application.cpp index 62583db72b6c..cafe61340d4c 100644 --- a/src/application/application.cpp +++ b/src/application/application.cpp @@ -101,16 +101,23 @@ void Application::LoadData() { } Log::Debug("Loading train file..."); + std::vector train_data; + for (auto it = config_.data.begin(); it != config_.data.end(); ++it) { + train_data.push_back(it->c_str()); + } DatasetLoader dataset_loader(config_, predict_fun, - config_.num_class, config_.data.c_str()); + config_.num_class, train_data); // load Training data if (config_.is_data_based_parallel) { // load data for parallel training - train_data_.reset(dataset_loader.LoadFromFile(config_.data.c_str(), + if (!train_data.empty()) { + Log::Fatal("parallel training with multiple train data have not been tested!"); + } + train_data_.reset(dataset_loader.LoadFromFile(train_data, Network::rank(), Network::num_machines())); } else { // load data for single machine - train_data_.reset(dataset_loader.LoadFromFile(config_.data.c_str(), 0, 1)); + train_data_.reset(dataset_loader.LoadFromFile(train_data, 0, 1)); } // need save binary file if (config_.save_binary) { @@ -160,8 +167,8 @@ void Application::LoadData() { } auto end_time = std::chrono::high_resolution_clock::now(); // output used time on each iteration - Log::Info("Finished loading data in %f seconds", - std::chrono::duration(end_time - start_time) * 1e-3); + Log::Info("Finished loading data in %f seconds, number of train data %d", + std::chrono::duration(end_time - start_time) * 1e-3, train_data_->num_data()); } void Application::InitTrain() { @@ -218,10 +225,14 @@ void Application::Train() { } void Application::Predict() { + if (config_.data.size() > 1) { + Log::Fatal("Predict supports only a single data file."); + } + const char* data = config_.data[0].c_str(); if (config_.task == TaskType::KRefitTree) { // create predictor Predictor predictor(boosting_.get(), 0, -1, false, true, false, false, 1, 1); - predictor.Predict(config_.data.c_str(), config_.output_result.c_str(), config_.header, config_.predict_disable_shape_check); + predictor.Predict(data, config_.output_result.c_str(), config_.header, config_.predict_disable_shape_check); TextReader result_reader(config_.output_result.c_str(), false); result_reader.ReadAllLines(); std::vector> pred_leaf(result_reader.Lines().size()); @@ -232,8 +243,8 @@ void Application::Predict() { result_reader.Lines()[i].clear(); } DatasetLoader dataset_loader(config_, nullptr, - config_.num_class, config_.data.c_str()); - train_data_.reset(dataset_loader.LoadFromFile(config_.data.c_str(), 0, 1)); + config_.num_class, data); + train_data_.reset(dataset_loader.LoadFromFile(data, 0, 1)); train_metric_.clear(); objective_fun_.reset(ObjectiveFunction::CreateObjectiveFunction(config_.objective, config_)); @@ -250,7 +261,7 @@ void Application::Predict() { config_.predict_leaf_index, config_.predict_contrib, config_.pred_early_stop, config_.pred_early_stop_freq, config_.pred_early_stop_margin); - predictor.Predict(config_.data.c_str(), + predictor.Predict(data, config_.output_result.c_str(), config_.header, config_.predict_disable_shape_check); Log::Info("Finished prediction"); } diff --git a/src/io/config.cpp b/src/io/config.cpp index dc04cb972d23..c43e3b9f31a2 100644 --- a/src/io/config.cpp +++ b/src/io/config.cpp @@ -226,8 +226,14 @@ void Config::Set(const std::unordered_map& params) { std::sort(eval_at.begin(), eval_at.end()); std::vector new_valid; + + std::set data_set; + for (size_t i = 0; i < data.size(); ++i) { + data_set.insert(data[i]); + } + for (size_t i = 0; i < valid.size(); ++i) { - if (valid[i] != data) { + if (data_set.count(valid[i]) == 0) { // Only push the non-training data new_valid.push_back(valid[i]); } else { diff --git a/src/io/config_auto.cpp b/src/io/config_auto.cpp index 06c53e84268a..9e07093175f3 100644 --- a/src/io/config_auto.cpp +++ b/src/io/config_auto.cpp @@ -308,7 +308,9 @@ void Config::GetMembersFromString(const std::unordered_map& filenames) + :config_(io_config), random_(config_.data_random_seed), predict_fun_(predict_fun), num_class_(num_class) { label_idx_ = 0; weight_idx_ = NO_SPECIFIC; group_idx_ = NO_SPECIFIC; - SetHeader(filename); + SetHeader(filenames[0]); store_raw_ = false; if (io_config.linear_tree) { store_raw_ = true; } } +DatasetLoader::DatasetLoader(const Config& io_config, const PredictFunction& predict_fun, int num_class, const char* filename) + : DatasetLoader(io_config, predict_fun, num_class, std::vector{filename}) { +} + DatasetLoader::~DatasetLoader() { } @@ -179,7 +183,7 @@ void CheckSampleSize(size_t sample_cnt, size_t num_data) { } } -Dataset* DatasetLoader::LoadFromFile(const char* filename, int rank, int num_machines) { +Dataset* DatasetLoader::LoadFromFile(const std::vector& filenames, int rank, int num_machines) { // don't support query id in data file when training in parallel if (num_machines > 1 && !config_.pre_partition) { if (group_idx_ > 0) { @@ -193,19 +197,21 @@ Dataset* DatasetLoader::LoadFromFile(const char* filename, int rank, int num_mac } data_size_t num_global_data = 0; std::vector used_data_indices; - auto bin_filename = CheckCanLoadFromBin(filename); + // When have multiple filenames, the saved binary uses first filename as its prefix. + auto bin_filename = CheckCanLoadFromBin(filenames[0]); bool is_load_from_binary = false; if (bin_filename.size() == 0) { - auto parser = std::unique_ptr(Parser::CreateParser(filename, config_.header, 0, label_idx_)); + auto parser = std::unique_ptr(Parser::CreateParser(filenames[0], config_.header, 0, label_idx_)); if (parser == nullptr) { - Log::Fatal("Could not recognize data format of %s", filename); + Log::Fatal("Could not recognize data format of %s", filenames[0]); } - dataset->data_filename_ = filename; + dataset->data_filename_ = filenames; dataset->label_idx_ = label_idx_; - dataset->metadata_.Init(filename); + // TODO support loading metadata from multiple files. + dataset->metadata_.Init(filenames[0]); if (!config_.two_round) { // read data to memory - auto text_data = LoadTextDataToMemory(filename, dataset->metadata_, rank, num_machines, &num_global_data, &used_data_indices); + auto text_data = LoadTextDataToMemory(filenames, dataset->metadata_, rank, num_machines, &num_global_data, &used_data_indices); dataset->num_data_ = static_cast(text_data.size()); // sample data auto sample_data = SampleTextDataFromMemory(text_data); @@ -223,7 +229,7 @@ Dataset* DatasetLoader::LoadFromFile(const char* filename, int rank, int num_mac text_data.clear(); } else { // sample data from file - auto sample_data = SampleTextDataFromFile(filename, dataset->metadata_, rank, num_machines, &num_global_data, &used_data_indices); + auto sample_data = SampleTextDataFromFile(filenames, dataset->metadata_, rank, num_machines, &num_global_data, &used_data_indices); if (used_data_indices.size() > 0) { dataset->num_data_ = static_cast(used_data_indices.size()); } else { @@ -240,13 +246,13 @@ Dataset* DatasetLoader::LoadFromFile(const char* filename, int rank, int num_mac dataset->metadata_.Init(dataset->num_data_, weight_idx_, group_idx_); Log::Info("Making second pass..."); // extract features - ExtractFeaturesFromFile(filename, parser.get(), used_data_indices, dataset.get()); + ExtractFeaturesFromFile(filenames, parser.get(), used_data_indices, dataset.get()); } } else { // load data from binary file is_load_from_binary = true; Log::Info("Load from binary file %s", bin_filename.c_str()); - dataset.reset(LoadFromBinFile(filename, bin_filename.c_str(), rank, num_machines, &num_global_data, &used_data_indices)); + dataset.reset(LoadFromBinFile(filenames, bin_filename.c_str(), rank, num_machines, &num_global_data, &used_data_indices)); } // check meta data dataset->metadata_.CheckOrPartition(num_global_data, used_data_indices); @@ -271,7 +277,7 @@ Dataset* DatasetLoader::LoadFromFileAlignWithOtherDataset(const char* filename, if (parser == nullptr) { Log::Fatal("Could not recognize data format of %s", filename); } - dataset->data_filename_ = filename; + dataset->data_filename_ = {filename}; dataset->label_idx_ = label_idx_; dataset->metadata_.Init(filename); if (!config_.two_round) { @@ -303,7 +309,7 @@ Dataset* DatasetLoader::LoadFromFileAlignWithOtherDataset(const char* filename, } } else { // load data from binary file - dataset.reset(LoadFromBinFile(filename, bin_filename.c_str(), 0, 1, &num_global_data, &used_data_indices)); + dataset.reset(LoadFromBinFile({filename}, bin_filename.c_str(), 0, 1, &num_global_data, &used_data_indices)); } // not need to check validation data // check meta data @@ -311,9 +317,10 @@ Dataset* DatasetLoader::LoadFromFileAlignWithOtherDataset(const char* filename, return dataset.release(); } -Dataset* DatasetLoader::LoadFromBinFile(const char* data_filename, const char* bin_filename, +Dataset* DatasetLoader::LoadFromBinFile(const std::vector& data_filename, const char* bin_filename, int rank, int num_machines, int* num_global_data, std::vector* used_data_indices) { + // We save a single binary file even when given multiple data files. auto dataset = std::unique_ptr(new Dataset()); auto reader = VirtualFileReader::Make(bin_filename); dataset->data_filename_ = data_filename; @@ -777,7 +784,7 @@ Dataset* DatasetLoader::ConstructFromSampleData(double** sample_values, void DatasetLoader::CheckDataset(const Dataset* dataset, bool is_load_from_binary) { if (dataset->num_data_ <= 0) { - Log::Fatal("Data file %s is empty", dataset->data_filename_.c_str()); + Log::Fatal("Data file %s is empty", dataset->data_filename_[0]); } if (dataset->feature_names_.size() != static_cast(dataset->num_total_features_)) { Log::Fatal("Size of feature name error, should be %d, got %d", dataset->num_total_features_, @@ -838,10 +845,10 @@ void DatasetLoader::CheckDataset(const Dataset* dataset, bool is_load_from_binar } } -std::vector DatasetLoader::LoadTextDataToMemory(const char* filename, const Metadata& metadata, +std::vector DatasetLoader::LoadTextDataToMemory(const std::vector& filenames, const Metadata& metadata, int rank, int num_machines, int* num_global_data, std::vector* used_data_indices) { - TextReader text_reader(filename, config_.header, config_.file_load_progress_interval_bytes); + TextReader text_reader(filenames, config_.header, config_.file_load_progress_interval_bytes); used_data_indices->clear(); if (num_machines == 1 || config_.pre_partition) { // read all lines @@ -860,6 +867,7 @@ std::vector DatasetLoader::LoadTextDataToMemory(const char* filenam } }, used_data_indices); } else { + // if contain query data, minimal sample unit is one query data_size_t num_queries = metadata.num_queries(); data_size_t qid = -1; @@ -900,7 +908,7 @@ std::vector DatasetLoader::SampleTextDataFromMemory(const std::vect return out; } -std::vector DatasetLoader::SampleTextDataFromFile(const char* filename, const Metadata& metadata, +std::vector DatasetLoader::SampleTextDataFromFile(const std::vector& filename, const Metadata& metadata, int rank, int num_machines, int* num_global_data, std::vector* used_data_indices) { const data_size_t sample_cnt = static_cast(config_.bin_construct_sample_cnt); @@ -1257,7 +1265,7 @@ void DatasetLoader::ExtractFeaturesFromMemory(std::vector* text_dat } /*! \brief Extract local features from file */ -void DatasetLoader::ExtractFeaturesFromFile(const char* filename, const Parser* parser, +void DatasetLoader::ExtractFeaturesFromFile(const std::vector& filename, const Parser* parser, const std::vector& used_data_indices, Dataset* dataset) { std::vector init_score; if (predict_fun_ != nullptr) { From 068881413311eaa3563c563eb3ebed4d9a6dc405 Mon Sep 17 00:00:00 2001 From: Chen Yufei Date: Tue, 29 Dec 2020 10:55:25 +0800 Subject: [PATCH 2/3] Print progress and read speed for every GB of data. --- include/LightGBM/config.h | 2 +- include/LightGBM/utils/text_reader.h | 43 +++++++++++++++++++++------- src/application/predictor.hpp | 2 +- 3 files changed, 34 insertions(+), 13 deletions(-) diff --git a/include/LightGBM/config.h b/include/LightGBM/config.h index 7c0c6faafe6d..c686684d2454 100644 --- a/include/LightGBM/config.h +++ b/include/LightGBM/config.h @@ -1013,7 +1013,7 @@ struct Config { #pragma endregion - size_t file_load_progress_interval_bytes = size_t(10) * 1024 * 1024 * 1024; + static constexpr size_t file_load_progress_interval_bytes = size_t(1) * 1024 * 1024 * 1024; bool is_parallel = false; bool is_data_based_parallel = false; diff --git a/include/LightGBM/utils/text_reader.h b/include/LightGBM/utils/text_reader.h index 3c2dae36ad94..de58fe0236be 100644 --- a/include/LightGBM/utils/text_reader.h +++ b/include/LightGBM/utils/text_reader.h @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -103,13 +104,14 @@ class TextReader { */ inline std::vector& Lines() { return lines_; } - INDEX_T ReadAllAndProcessFile(const char* filename, const std::function& process_fun, INDEX_T& total_cnt) { - Log::Debug("ReadAllAndProcessFile %s", filename); + INDEX_T ReadAllAndProcessFile(const char* filename, const std::function& process_fun, + INDEX_T& total_cnt, size_t& bytes_read, + std::chrono::time_point& start_time, + std::chrono::time_point& prev_time) { last_line_ = ""; - size_t bytes_read = 0; PipelineReader::Read(filename, skip_bytes_, - [&process_fun, &bytes_read, &total_cnt, this] + [&process_fun, &bytes_read, &total_cnt, &filename, &start_time, &prev_time, this] (const char* buffer_process, size_t read_cnt) { size_t cnt = 0; size_t i = 0; @@ -145,7 +147,13 @@ class TextReader { size_t prev_bytes_read = bytes_read; bytes_read += read_cnt; if (prev_bytes_read / read_progress_interval_bytes_ < bytes_read / read_progress_interval_bytes_) { - Log::Debug("Read %.1f GBs from %s.", 1.0 * bytes_read / kGbs, filename_); + auto now = std::chrono::high_resolution_clock::now(); + auto total_time = std::chrono::duration(now - start_time) * 1e-3; + auto interval_time = std::chrono::duration(now - prev_time) * 1e-3; + Log::Info("Read %.1f GBs in %.1f seconds (from %s), last interval speed %.1f MB/s", + 1.0 * bytes_read / kGbs, total_time, filename, + 1.0 * read_progress_interval_bytes_ / kGbs * 1024.0 / interval_time.count()); + prev_time = now; } return cnt; @@ -157,14 +165,17 @@ class TextReader { ++total_cnt; last_line_ = ""; } - Log::Debug("ReadAllAndProcessFile total_cnt %lu", total_cnt); + Log::Info("ReadAllAndProcessFile %s total_cnt %lu, bytes_read %lu", filename, total_cnt, bytes_read); return total_cnt; } INDEX_T ReadAllAndProcess(const std::function& process_fun) { INDEX_T total_cnt = 0; + size_t bytes_read = 0; + auto start_time = std::chrono::high_resolution_clock::now(); + auto prev_time = start_time; for (const auto& it : filename_) { - ReadAllAndProcessFile(it, process_fun, total_cnt); + ReadAllAndProcessFile(it, process_fun, total_cnt, bytes_read, start_time, prev_time); } return total_cnt; } @@ -278,11 +289,13 @@ class TextReader { INDEX_T ReadAllAndProcessParallelWithFilterOne(const char* filename, const std::function&)>& process_fun, const std::function& filter_fun, - INDEX_T& total_cnt, size_t& bytes_read, INDEX_T& used_cnt) { + INDEX_T& total_cnt, size_t& bytes_read, INDEX_T& used_cnt, + std::chrono::time_point& start_time, + std::chrono::time_point& prev_time) { Log::Debug("ReadAllAndProcessParallelWithFilterOne %s", filename); last_line_ = ""; PipelineReader::Read(filename, skip_bytes_, - [&process_fun, &filter_fun, &total_cnt, &bytes_read, &used_cnt, this] + [&process_fun, &filter_fun, &total_cnt, &bytes_read, &used_cnt, &filename, &start_time, &prev_time, this] (const char* buffer_process, size_t read_cnt) { size_t cnt = 0; size_t i = 0; @@ -327,7 +340,13 @@ class TextReader { size_t prev_bytes_read = bytes_read; bytes_read += read_cnt; if (prev_bytes_read / read_progress_interval_bytes_ < bytes_read / read_progress_interval_bytes_) { - Log::Debug("Read %.1f GBs from %s.", 1.0 * bytes_read / kGbs, filename_); + auto now = std::chrono::high_resolution_clock::now(); + auto total_time = std::chrono::duration(now - start_time) * 1e-3; + auto interval_time = std::chrono::duration(now - prev_time) * 1e-3; + Log::Info("Read %.1f GBs in %.1f seconds (from %s), last interval speed %.1f MB/s", + 1.0 * bytes_read / kGbs, total_time, + 1.0 * read_progress_interval_bytes_ / kGbs * 1024.0 / interval_time.count(), filename); + prev_time = now; } return cnt; @@ -351,8 +370,10 @@ class TextReader { INDEX_T total_cnt = 0; size_t bytes_read = 0; INDEX_T used_cnt = 0; + auto start_time = std::chrono::high_resolution_clock::now(); + auto prev_time = start_time; for (const auto& it : filename_) { - ReadAllAndProcessParallelWithFilterOne(it, process_fun, filter_fun, total_cnt, bytes_read, used_cnt); + ReadAllAndProcessParallelWithFilterOne(it, process_fun, filter_fun, total_cnt, bytes_read, used_cnt, start_time, prev_time); } Log::Debug("ReadAllAndProcessParallelWithFilter total_cnt %lu", total_cnt); return total_cnt; diff --git a/src/application/predictor.hpp b/src/application/predictor.hpp index b2f7dadc2546..aded5bf253e0 100644 --- a/src/application/predictor.hpp +++ b/src/application/predictor.hpp @@ -172,7 +172,7 @@ class Predictor { Log::Fatal("The number of features in data (%d) is not the same as it was in training data (%d).\n" \ "You can set ``predict_disable_shape_check=true`` to discard this error, but please be aware what you are doing.", parser->NumFeatures(), boosting_->MaxFeatureIdx() + 1); } - TextReader predict_data_reader(data_filename, header); + TextReader predict_data_reader(data_filename, header, Config::file_load_progress_interval_bytes); std::vector feature_remapper(parser->NumFeatures(), -1); bool need_adjust = false; if (header) { From ee5164d22b66be0afac4b31e225800fd81138f78 Mon Sep 17 00:00:00 2001 From: Chen Yufei Date: Thu, 4 Feb 2021 13:38:07 +0800 Subject: [PATCH 3/3] Fix lint complaint. --- include/LightGBM/utils/text_reader.h | 46 ++++++++++++++-------------- src/io/dataset_loader.cpp | 3 +- 2 files changed, 24 insertions(+), 25 deletions(-) diff --git a/include/LightGBM/utils/text_reader.h b/include/LightGBM/utils/text_reader.h index de58fe0236be..e5e73ceab618 100644 --- a/include/LightGBM/utils/text_reader.h +++ b/include/LightGBM/utils/text_reader.h @@ -105,13 +105,13 @@ class TextReader { inline std::vector& Lines() { return lines_; } INDEX_T ReadAllAndProcessFile(const char* filename, const std::function& process_fun, - INDEX_T& total_cnt, size_t& bytes_read, + INDEX_T* total_cnt, size_t& bytes_read, std::chrono::time_point& start_time, std::chrono::time_point& prev_time) { last_line_ = ""; PipelineReader::Read(filename, skip_bytes_, - [&process_fun, &bytes_read, &total_cnt, &filename, &start_time, &prev_time, this] + [&process_fun, &bytes_read, total_cnt, &filename, &start_time, &prev_time, this] (const char* buffer_process, size_t read_cnt) { size_t cnt = 0; size_t i = 0; @@ -125,14 +125,14 @@ class TextReader { if (buffer_process[i] == '\n' || buffer_process[i] == '\r') { if (last_line_.size() > 0) { last_line_.append(buffer_process + last_i, i - last_i); - process_fun(total_cnt, last_line_.c_str(), last_line_.size()); + process_fun(*total_cnt, last_line_.c_str(), last_line_.size()); last_line_ = ""; } else { - process_fun(total_cnt, buffer_process + last_i, i - last_i); + process_fun(*total_cnt, buffer_process + last_i, i - last_i); } ++cnt; ++i; - ++total_cnt; + ++(*total_cnt); // skip end of line while ((buffer_process[i] == '\n' || buffer_process[i] == '\r') && i < read_cnt) { ++i; } last_i = i; @@ -161,12 +161,12 @@ class TextReader { // if last line of file doesn't contain end of line if (last_line_.size() > 0) { Log::Info("Warning: last line of %s has no end of line, still using this line", filename_); - process_fun(total_cnt, last_line_.c_str(), last_line_.size()); - ++total_cnt; + process_fun(*total_cnt, last_line_.c_str(), last_line_.size()); + ++(*total_cnt); last_line_ = ""; } - Log::Info("ReadAllAndProcessFile %s total_cnt %lu, bytes_read %lu", filename, total_cnt, bytes_read); - return total_cnt; + Log::Info("ReadAllAndProcessFile %s total_cnt %lu, bytes_read %lu", filename, *total_cnt, bytes_read); + return *total_cnt; } INDEX_T ReadAllAndProcess(const std::function& process_fun) { @@ -175,7 +175,7 @@ class TextReader { auto start_time = std::chrono::high_resolution_clock::now(); auto prev_time = start_time; for (const auto& it : filename_) { - ReadAllAndProcessFile(it, process_fun, total_cnt, bytes_read, start_time, prev_time); + ReadAllAndProcessFile(it, process_fun, &total_cnt, bytes_read, start_time, prev_time); } return total_cnt; } @@ -289,13 +289,13 @@ class TextReader { INDEX_T ReadAllAndProcessParallelWithFilterOne(const char* filename, const std::function&)>& process_fun, const std::function& filter_fun, - INDEX_T& total_cnt, size_t& bytes_read, INDEX_T& used_cnt, + INDEX_T* total_cnt, size_t* bytes_read, INDEX_T& used_cnt, std::chrono::time_point& start_time, std::chrono::time_point& prev_time) { Log::Debug("ReadAllAndProcessParallelWithFilterOne %s", filename); last_line_ = ""; PipelineReader::Read(filename, skip_bytes_, - [&process_fun, &filter_fun, &total_cnt, &bytes_read, &used_cnt, &filename, &start_time, &prev_time, this] + [&process_fun, &filter_fun, total_cnt, bytes_read, &used_cnt, &filename, &start_time, &prev_time, this] (const char* buffer_process, size_t read_cnt) { size_t cnt = 0; size_t i = 0; @@ -310,20 +310,20 @@ class TextReader { if (buffer_process[i] == '\n' || buffer_process[i] == '\r') { if (last_line_.size() > 0) { last_line_.append(buffer_process + last_i, i - last_i); - if (filter_fun(used_cnt, total_cnt)) { + if (filter_fun(used_cnt, *total_cnt)) { lines_.push_back(last_line_); ++used_cnt; } last_line_ = ""; } else { - if (filter_fun(used_cnt, total_cnt)) { + if (filter_fun(used_cnt, *total_cnt)) { lines_.emplace_back(buffer_process + last_i, i - last_i); ++used_cnt; } } ++cnt; ++i; - ++total_cnt; + ++(*total_cnt); // skip end of line while ((buffer_process[i] == '\n' || buffer_process[i] == '\r') && i < read_cnt) { ++i; } last_i = i; @@ -337,14 +337,14 @@ class TextReader { last_line_.append(buffer_process + last_i, read_cnt - last_i); } - size_t prev_bytes_read = bytes_read; - bytes_read += read_cnt; - if (prev_bytes_read / read_progress_interval_bytes_ < bytes_read / read_progress_interval_bytes_) { + size_t prev_bytes_read = *bytes_read; + *bytes_read += read_cnt; + if (prev_bytes_read / read_progress_interval_bytes_ < *bytes_read / read_progress_interval_bytes_) { auto now = std::chrono::high_resolution_clock::now(); auto total_time = std::chrono::duration(now - start_time) * 1e-3; auto interval_time = std::chrono::duration(now - prev_time) * 1e-3; Log::Info("Read %.1f GBs in %.1f seconds (from %s), last interval speed %.1f MB/s", - 1.0 * bytes_read / kGbs, total_time, + 1.0 * (*bytes_read) / kGbs, total_time, 1.0 * read_progress_interval_bytes_ / kGbs * 1024.0 / interval_time.count(), filename); prev_time = now; } @@ -354,16 +354,16 @@ class TextReader { // if last line of file doesn't contain end of line if (last_line_.size() > 0) { Log::Info("Warning: last line of %s has no end of line, still using this line", filename_); - if (filter_fun(used_cnt, total_cnt)) { + if (filter_fun(used_cnt, *total_cnt)) { lines_.push_back(last_line_); process_fun(used_cnt, lines_); } lines_.clear(); - ++total_cnt; + ++(*total_cnt); ++used_cnt; last_line_ = ""; } - return total_cnt; + return *total_cnt; } INDEX_T ReadAllAndProcessParallelWithFilter(const std::function&)>& process_fun, const std::function& filter_fun) { @@ -373,7 +373,7 @@ class TextReader { auto start_time = std::chrono::high_resolution_clock::now(); auto prev_time = start_time; for (const auto& it : filename_) { - ReadAllAndProcessParallelWithFilterOne(it, process_fun, filter_fun, total_cnt, bytes_read, used_cnt, start_time, prev_time); + ReadAllAndProcessParallelWithFilterOne(it, process_fun, filter_fun, &total_cnt, &bytes_read, used_cnt, start_time, prev_time); } Log::Debug("ReadAllAndProcessParallelWithFilter total_cnt %lu", total_cnt); return total_cnt; diff --git a/src/io/dataset_loader.cpp b/src/io/dataset_loader.cpp index 439c4ff991e4..6881376f06c8 100644 --- a/src/io/dataset_loader.cpp +++ b/src/io/dataset_loader.cpp @@ -207,7 +207,7 @@ Dataset* DatasetLoader::LoadFromFile(const std::vector& filenames, } dataset->data_filename_ = filenames; dataset->label_idx_ = label_idx_; - // TODO support loading metadata from multiple files. + // XXX load metadata only from the first input. dataset->metadata_.Init(filenames[0]); if (!config_.two_round) { // read data to memory @@ -867,7 +867,6 @@ std::vector DatasetLoader::LoadTextDataToMemory(const std::vector