From 3a5944057886828a767a47a056c926058e9c7d7c Mon Sep 17 00:00:00 2001 From: binwei Date: Sat, 30 Apr 2022 16:32:25 +0800 Subject: [PATCH 1/3] merge master and branch shuffle_opt_fillbyreducer. To submit PR to upstream Implemented fill by reducer --- native-sql-engine/cpp/CMakeLists.txt | 3 + .../src/benchmarks/shuffle_split_benchmark.cc | 499 ++++++++++-------- native-sql-engine/cpp/src/shuffle/splitter.cc | 167 +++++- native-sql-engine/cpp/src/shuffle/splitter.h | 4 + 4 files changed, 436 insertions(+), 237 deletions(-) diff --git a/native-sql-engine/cpp/CMakeLists.txt b/native-sql-engine/cpp/CMakeLists.txt index a1301fd1d..fe7e989ee 100644 --- a/native-sql-engine/cpp/CMakeLists.txt +++ b/native-sql-engine/cpp/CMakeLists.txt @@ -1,6 +1,9 @@ cmake_minimum_required(VERSION 3.16) project(spark_columnar_plugin) +#add_definitions(-DSKIPWRITE -DSKIPCOMPRESS -DPROCESSROW) +add_definitions(-DPROCESSROW) + #add_compile_options(-g) set(CMAKE_EXPORT_COMPILE_COMMANDS ON) diff --git a/native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc b/native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc index cd81ef877..ec1416641 100644 --- a/native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc +++ b/native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc @@ -41,24 +41,12 @@ namespace shuffle { const int batch_buffer_size = 32768; const int split_buffer_size = 8192; -class BenchmarkShuffleSplit : public ::benchmark::Fixture { +class BenchmarkShuffleSplit { public: - BenchmarkShuffleSplit() { - file_name = - "/mnt/DP_disk1/lineitem/" - "part-00025-356249a2-c285-42b9-8a18-5b10be61e0c4-c000.snappy.parquet"; - + BenchmarkShuffleSplit(std::string file_name) { GetRecordBatchReader(file_name); - std::cout << schema->ToString() << std::endl; - const auto& fields = schema->fields(); - for (const auto& field : fields) { - if (field->name() == "l_orderkey") { - auto node = gandiva::TreeExprBuilder::MakeField(field); - expr_vector.push_back(gandiva::TreeExprBuilder::MakeExpression( - std::move(node), arrow::field("res_" + field->name(), field->type()))); - } - } } + void GetRecordBatchReader(const std::string& input_file) { std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; std::shared_ptr record_batch_reader; @@ -89,11 +77,97 @@ class BenchmarkShuffleSplit : public ::benchmark::Fixture { for (int i = 0; i < num_columns; ++i) { column_indices.push_back(i); } + const auto& fields = schema->fields(); + for (const auto& field : fields) { + if (field->name() == "l_orderkey") { + auto node = gandiva::TreeExprBuilder::MakeField(field); + expr_vector.push_back(gandiva::TreeExprBuilder::MakeExpression( + std::move(node), arrow::field("res_" + field->name(), field->type()))); + } + } } - void SetUp(const ::benchmark::State& state) {} + void operator()(benchmark::State& state) { + SetCPU(state.thread_index()); + arrow::Compression::type compression_type = (arrow::Compression::type)state.range(1); + + const int num_partitions = state.range(0); + + auto options = SplitOptions::Defaults(); + options.compression_type = compression_type; + options.buffer_size = split_buffer_size; + options.buffered_write = true; + options.offheap_per_task = 128 * 1024 * 1024 * 1024L; + options.prefer_spill = true; + options.write_schema = false; + + std::shared_ptr splitter; + int64_t elapse_read = 0; + int64_t num_batches = 0; + int64_t num_rows = 0; + int64_t split_time = 0; + + Do_Split(splitter, elapse_read, num_batches, num_rows, split_time, + num_partitions, options, state); + + auto fs = std::make_shared(); + fs->DeleteFile(splitter->DataFile()); + + state.SetBytesProcessed(int64_t(splitter->RawPartitionBytes())); + + state.counters["rowgroups"] = + benchmark::Counter(row_group_indices.size(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["columns"] = + benchmark::Counter(column_indices.size(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["batches"] = benchmark::Counter( + num_batches, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["num_rows"] = benchmark::Counter( + num_rows, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["num_partitions"] = benchmark::Counter( + num_partitions, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["batch_buffer_size"] = + benchmark::Counter(batch_buffer_size, benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + state.counters["split_buffer_size"] = + benchmark::Counter(split_buffer_size, benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + + state.counters["bytes_spilled"] = + benchmark::Counter(splitter->TotalBytesSpilled(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + state.counters["bytes_written"] = + benchmark::Counter(splitter->TotalBytesWritten(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + state.counters["bytes_raw"] = + benchmark::Counter(splitter->RawPartitionBytes(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + state.counters["bytes_spilled"] = + benchmark::Counter(splitter->TotalBytesSpilled(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + + state.counters["parquet_parse"] = benchmark::Counter( + elapse_read, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["compute_pid_time"] = + benchmark::Counter(splitter->TotalComputePidTime(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["write_time"] = + benchmark::Counter(splitter->TotalWriteTime(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["spill_time"] = + benchmark::Counter(splitter->TotalSpillTime(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["compress_time"] = + benchmark::Counter(splitter->TotalCompressTime(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + + split_time = split_time - splitter->TotalSpillTime() - splitter->TotalComputePidTime() - + splitter->TotalCompressTime() - splitter->TotalWriteTime(); + state.counters["split_time"] = benchmark::Counter( + split_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - void TearDown(const ::benchmark::State& state) {} + } protected: long SetCPU(uint32_t cpuindex) { @@ -102,9 +176,9 @@ class BenchmarkShuffleSplit : public ::benchmark::Fixture { CPU_SET(cpuindex, &cs); return sched_setaffinity(0, sizeof(cs), &cs); } - virtual void Do_Split(const std::shared_ptr& splitter, int64_t& elapse_read, + virtual void Do_Split(std::shared_ptr& splitter, int64_t& elapse_read, int64_t& num_batches, int64_t& num_rows, int64_t& split_time, - benchmark::State& state) {} + const int num_partitions, SplitOptions options, benchmark::State& state) {} protected: std::string file_name; @@ -116,232 +190,124 @@ class BenchmarkShuffleSplit : public ::benchmark::Fixture { parquet::ArrowReaderProperties properties; }; -BENCHMARK_DEFINE_F(BenchmarkShuffleSplit, CacheScan)(benchmark::State& state) { - SetCPU(state.thread_index()); - arrow::Compression::type compression_type = (arrow::Compression::type)state.range(1); +class BenchmarkShuffleSplit_CacheScan_Benchmark: public BenchmarkShuffleSplit{ +public: +BenchmarkShuffleSplit_CacheScan_Benchmark(std::string filename):BenchmarkShuffleSplit(filename){} - const int num_partitions = state.range(0); +protected: + void Do_Split(std::shared_ptr& splitter, int64_t& elapse_read, + int64_t& num_batches, int64_t& num_rows, int64_t& split_time, + const int num_partitions, SplitOptions options, benchmark::State& state) { + + std::vector local_column_indices; + local_column_indices.push_back(0); + local_column_indices.push_back(1); + local_column_indices.push_back(2); + local_column_indices.push_back(4); + local_column_indices.push_back(5); + local_column_indices.push_back(6); + local_column_indices.push_back(7); + + std::shared_ptr local_schema; + local_schema = std::make_shared(*schema.get()); + + ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(15)); + ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(14)); + ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(13)); + ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(12)); + ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(11)); + ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(10)); + ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(9)); + ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(8)); + ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(3)); + + if(state.thread_index() == 0) + std::cout << local_schema->ToString() << std::endl; - auto options = SplitOptions::Defaults(); - options.compression_type = compression_type; - options.buffer_size = split_buffer_size; - options.buffered_write = true; - options.offheap_per_task = 128 * 1024 * 1024 * 1024L; - options.prefer_spill = true; - options.write_schema = false; + ARROW_ASSIGN_OR_THROW( + splitter, Splitter::Make("rr", local_schema, num_partitions, options)); + + std::shared_ptr record_batch; - std::shared_ptr splitter; + std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; + std::shared_ptr record_batch_reader; + ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( + arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), properties, + &parquet_reader)); - if (!expr_vector.empty()) { - ARROW_ASSIGN_OR_THROW(splitter, Splitter::Make("hash", schema, num_partitions, - expr_vector, std::move(options))); - } else { - ARROW_ASSIGN_OR_THROW( - splitter, Splitter::Make("rr", schema, num_partitions, std::move(options))); - } + std::vector> batches; + ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader(row_group_indices, local_column_indices, + &record_batch_reader)); + do { + TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); - std::shared_ptr record_batch; - int64_t elapse_read = 0; - int64_t num_batches = 0; - int64_t num_rows = 0; - int64_t split_time = 0; - - std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; - std::shared_ptr record_batch_reader; - ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( - arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), properties, - &parquet_reader)); - - std::vector> batches; - ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader(row_group_indices, column_indices, - &record_batch_reader)); - do { - TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); - - if (record_batch) { - batches.push_back(record_batch); - num_batches += 1; - num_rows += record_batch->num_rows(); + if (record_batch) { + batches.push_back(record_batch); + num_batches += 1; + num_rows += record_batch->num_rows(); + } + } while (record_batch); + std::cout << "parquet parse done elapsed time " << elapse_read/1000000 << " ms " << std::endl; + std::cout << "batches = " << num_batches << " rows = " << num_rows << std::endl; + + for (auto _ : state) { + for_each(batches.begin(), batches.end(), + [&splitter, &split_time](std::shared_ptr& record_batch) { + TIME_NANO_OR_THROW(split_time, splitter->Split(*record_batch)); + }); } - } while (record_batch); - for (auto _ : state) { - for_each(batches.begin(), batches.end(), - [&splitter, &split_time](std::shared_ptr& record_batch) { - TIME_NANO_OR_THROW(split_time, splitter->Split(*record_batch)); - }); + TIME_NANO_OR_THROW(split_time, splitter->Stop()); } - TIME_NANO_OR_THROW(split_time, splitter->Stop()); - - auto fs = std::make_shared(); - fs->DeleteFile(splitter->DataFile()); - - state.SetBytesProcessed(int64_t(splitter->RawPartitionBytes())); - - state.counters["rowgroups"] = - benchmark::Counter(row_group_indices.size(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["columns"] = - benchmark::Counter(column_indices.size(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["batches"] = benchmark::Counter( - num_batches, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["num_rows"] = benchmark::Counter( - num_rows, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["num_partitions"] = benchmark::Counter( - num_partitions, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["batch_buffer_size"] = - benchmark::Counter(batch_buffer_size, benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["split_buffer_size"] = - benchmark::Counter(split_buffer_size, benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - - state.counters["bytes_spilled"] = - benchmark::Counter(splitter->TotalBytesSpilled(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["bytes_written"] = - benchmark::Counter(splitter->TotalBytesWritten(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["bytes_raw"] = - benchmark::Counter(splitter->RawPartitionBytes(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["bytes_spilled"] = - benchmark::Counter(splitter->TotalBytesSpilled(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - - state.counters["parquet_parse"] = benchmark::Counter( - elapse_read, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["compute_pid_time"] = - benchmark::Counter(splitter->TotalComputePidTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["write_time"] = - benchmark::Counter(splitter->TotalWriteTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["spill_time"] = - benchmark::Counter(splitter->TotalSpillTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["compress_time"] = - benchmark::Counter(splitter->TotalCompressTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - - split_time = split_time - splitter->TotalSpillTime() - splitter->TotalComputePidTime() - - splitter->TotalCompressTime() - splitter->TotalWriteTime(); - state.counters["split_time"] = benchmark::Counter( - split_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); -} - -BENCHMARK_DEFINE_F(BenchmarkShuffleSplit, IterateScan)(benchmark::State& state) { - SetCPU(state.thread_index()); - - arrow::Compression::type compression_type = (arrow::Compression::type)state.range(1); - - const int num_partitions = state.range(0); - - auto options = SplitOptions::Defaults(); - options.compression_type = compression_type; - options.buffer_size = split_buffer_size; - options.buffered_write = true; - options.offheap_per_task = 128 * 1024 * 1024 * 1024L; - options.prefer_spill = true; - options.write_schema = false; - - std::shared_ptr splitter; - - if (!expr_vector.empty()) { - ARROW_ASSIGN_OR_THROW(splitter, Splitter::Make("hash", schema, num_partitions, - expr_vector, std::move(options))); - } else { - ARROW_ASSIGN_OR_THROW( - splitter, Splitter::Make("rr", schema, num_partitions, std::move(options))); - } - int64_t elapse_read = 0; - int64_t num_batches = 0; - int64_t num_rows = 0; - int64_t split_time = 0; +}; - std::shared_ptr record_batch; - std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; - std::shared_ptr record_batch_reader; - ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( - arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), properties, - &parquet_reader)); +class BenchmarkShuffleSplit_IterateScan_Benchmark: public BenchmarkShuffleSplit{ +public: +BenchmarkShuffleSplit_IterateScan_Benchmark(std::string filename):BenchmarkShuffleSplit(filename){} - for (auto _ : state) { - std::vector> batches; - ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader(row_group_indices, column_indices, - &record_batch_reader)); - TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); - while (record_batch) { - num_batches += 1; - num_rows += record_batch->num_rows(); - TIME_NANO_OR_THROW(split_time, splitter->Split(*record_batch)); +protected: + void Do_Split(std::shared_ptr& splitter, int64_t& elapse_read, + int64_t& num_batches, int64_t& num_rows, int64_t& split_time, + const int num_partitions, SplitOptions options, benchmark::State& state) { + + if(state.thread_index() == 0) + std::cout << schema->ToString() << std::endl; + + if (!expr_vector.empty()) { + ARROW_ASSIGN_OR_THROW(splitter, Splitter::Make("hash", schema, num_partitions, + expr_vector, std::move(options))); + } else { + ARROW_ASSIGN_OR_THROW( + splitter, Splitter::Make("rr", schema, num_partitions, std::move(options))); + } + + std::shared_ptr record_batch; + + std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; + std::shared_ptr record_batch_reader; + ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( + arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), properties, + &parquet_reader)); + + for (auto _ : state) { + std::vector> batches; + ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader(row_group_indices, column_indices, + &record_batch_reader)); TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); + while (record_batch) { + num_batches += 1; + num_rows += record_batch->num_rows(); + TIME_NANO_OR_THROW(split_time, splitter->Split(*record_batch)); + TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); + } } + TIME_NANO_OR_THROW(split_time, splitter->Stop()); } - TIME_NANO_OR_THROW(split_time, splitter->Stop()); - - auto fs = std::make_shared(); - fs->DeleteFile(splitter->DataFile()); - - state.SetBytesProcessed(int64_t(splitter->RawPartitionBytes())); - - state.counters["rowgroups"] = - benchmark::Counter(row_group_indices.size(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["columns"] = - benchmark::Counter(column_indices.size(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["batches"] = benchmark::Counter( - num_batches, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["num_rows"] = benchmark::Counter( - num_rows, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["num_partitions"] = benchmark::Counter( - num_partitions, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["batch_buffer_size"] = - benchmark::Counter(batch_buffer_size, benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["split_buffer_size"] = - benchmark::Counter(split_buffer_size, benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - - state.counters["bytes_spilled"] = - benchmark::Counter(splitter->TotalBytesSpilled(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["bytes_written"] = - benchmark::Counter(splitter->TotalBytesWritten(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["bytes_raw"] = - benchmark::Counter(splitter->RawPartitionBytes(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["bytes_spilled"] = - benchmark::Counter(splitter->TotalBytesSpilled(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - - state.counters["parquet_parse"] = benchmark::Counter( - elapse_read, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["compute_pid_time"] = - benchmark::Counter(splitter->TotalComputePidTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["write_time"] = - benchmark::Counter(splitter->TotalWriteTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["spill_time"] = - benchmark::Counter(splitter->TotalSpillTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["compress_time"] = - benchmark::Counter(splitter->TotalCompressTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - - split_time = split_time - splitter->TotalSpillTime() - splitter->TotalComputePidTime() - - splitter->TotalCompressTime() - splitter->TotalWriteTime(); - state.counters["split_time"] = benchmark::Counter( - split_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); -} +}; /*BENCHMARK_REGISTER_F(BenchmarkShuffleSplit, CacheScan)->Iterations(1) ->Args({96*2, arrow::Compression::FASTPFOR}) @@ -370,14 +336,79 @@ BENCHMARK_DEFINE_F(BenchmarkShuffleSplit, IterateScan)(benchmark::State& state) ->Threads(16) ->Threads(24) ->Unit(benchmark::kSecond);*/ -BENCHMARK_REGISTER_F(BenchmarkShuffleSplit, IterateScan) - ->Iterations(1) - ->Args({96 * 16, arrow::Compression::FASTPFOR}) - ->Threads(24) +/*BENCHMARK_REGISTER_F(BenchmarkShuffleSplit, CacheScan) + ->Iterations(1000000) + ->Args({512, arrow::Compression::FASTPFOR}) + ->Threads(1) ->ReportAggregatesOnly(false) ->MeasureProcessCPUTime() - ->Unit(benchmark::kSecond); + ->Unit(benchmark::kSecond);*/ } // namespace shuffle } // namespace sparkcolumnarplugin -BENCHMARK_MAIN(); +int main(int argc, char** argv) { + + uint32_t iterations=1; + uint32_t partitions=512; + uint32_t threads=1; + std::string datafile; + + for (int i=0;iIterations(iterations) + ->Args({partitions, arrow::Compression::FASTPFOR}) + ->Threads(threads) + ->ReportAggregatesOnly(false) + ->MeasureProcessCPUTime() + ->Unit(benchmark::kSecond); + +/* sparkcolumnarplugin::shuffle::BenchmarkShuffleSplit_IterateScan_Benchmark bck(datafile); + + benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", bck) + ->Iterations(1) + ->Args({96*2, arrow::Compression::FASTPFOR}) + ->Args({96*4, arrow::Compression::FASTPFOR}) + ->Args({96*8, arrow::Compression::FASTPFOR}) + ->Args({96*16, arrow::Compression::FASTPFOR}) + ->Args({96*32, arrow::Compression::FASTPFOR}) + ->Threads(24) + ->Unit(benchmark::kSecond); + + benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", bck) + ->Iterations(1) + ->Args({4096, arrow::Compression::FASTPFOR}) + ->Threads(1) + ->Threads(2) + ->Threads(4) + ->Threads(8) + ->Threads(16) + ->Threads(24) + ->Unit(benchmark::kSecond); +*/ + + benchmark::Initialize(&argc, argv); + benchmark::RunSpecifiedBenchmarks(); + benchmark::Shutdown(); +} \ No newline at end of file diff --git a/native-sql-engine/cpp/src/shuffle/splitter.cc b/native-sql-engine/cpp/src/shuffle/splitter.cc index e739bd04f..798668dde 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.cc +++ b/native-sql-engine/cpp/src/shuffle/splitter.cc @@ -28,20 +28,45 @@ #include #include +#include +#include +#include #include "shuffle/utils.h" #include "utils/macros.h" +#include -#if defined(COLUMNAR_PLUGIN_USE_AVX512) +/*#if defined(COLUMNAR_PLUGIN_USE_AVX512) #include #else #include #endif +*/ namespace sparkcolumnarplugin { namespace shuffle { using arrow::internal::checked_cast; + + + +template +std::string __m128i_toString(const __m128i var) { + std::stringstream sstr; + T values[16/sizeof(T)]; + std::memcpy(values,&var,sizeof(values)); //See discussion below + if (sizeof(T) == 1) { + for (unsigned int i = 0; i < sizeof(__m128i); i++) { //C++11: Range for also possible + sstr << std::hex << (int) values[i] << " " << std::dec; + } + } else { + for (unsigned int i = 0; i < sizeof(__m128i) / sizeof(T); i++) { //C++11: Range for also possible + sstr << std::hex << values[i] << " " << std::dec; + } + } + return sstr.str(); +} + SplitOptions SplitOptions::Defaults() { return SplitOptions(); } #if defined(COLUMNAR_PLUGIN_USE_AVX512) inline __m256i CountPartitionIdOccurrence(const std::vector& partition_id, @@ -293,6 +318,7 @@ arrow::Status Splitter::Init() { partition_cached_recordbatch_size_.resize(num_partitions_); partition_lengths_.resize(num_partitions_); raw_partition_lengths_.resize(num_partitions_); + reducer_offset_offset_.resize(num_partitions_ + 1); for (int i = 0; i < column_type_id_.size(); ++i) { switch (column_type_id_[i]->id()) { @@ -815,6 +841,26 @@ arrow::Result Splitter::SpillLargestPartition(int64_t* size) { } arrow::Status Splitter::DoSplit(const arrow::RecordBatch& rb) { +#ifdef PROCESSROW + + reducer_offsets_.resize(rb.num_rows()); + + reducer_offset_offset_[0] = 0; + for (auto pid = 1; pid <= num_partitions_; pid++) { + reducer_offset_offset_[pid] = + reducer_offset_offset_[pid - 1] + partition_id_cnt_[pid - 1]; + } + for (auto row = 0; row < rb.num_rows(); row++) { + auto pid = partition_id_[row]; + reducer_offsets_[reducer_offset_offset_[pid]] = row; + _mm_prefetch(reducer_offsets_.data() + reducer_offset_offset_[pid] + 32, _MM_HINT_T0); + reducer_offset_offset_[pid]++; + } + std::transform(reducer_offset_offset_.begin(), std::prev(reducer_offset_offset_.end()), + partition_id_cnt_.begin(), reducer_offset_offset_.begin(), + [](uint16_t x, int16_t y) { return x - y; }); + +#endif // for the first input record batch, scan binary arrays and large binary // arrays to get their empirical sizes @@ -922,6 +968,27 @@ arrow::Status Splitter::SplitFixedWidthValueBuffer(const arrow::RecordBatch& rb) auto src_addr = const_cast(rb.column_data(col_idx)->buffers[1]->data()); switch (arrow::bit_width(column_type_id_[col_idx]->id())) { +#ifdef PROCESSROW +// assume batch size = 32k; reducer# = 4K; row/reducer = 8 +#define PROCESS(_CTYPE) \ + std::transform(partition_buffer_idx_offset_.begin(), \ + partition_buffer_idx_offset_.end(), partition_buffer_idx_base_.begin(), \ + partition_buffer_idx_offset_.begin(), \ + [](uint8_t* x, int16_t y) { return x + y * sizeof(_CTYPE); }); \ + for (auto pid = 0; pid < num_partitions_; pid++) { \ + auto dst_pid_base = \ + reinterpret_cast<_CTYPE*>(partition_buffer_idx_offset_[pid]); /*32k*/ \ + auto r = reducer_offset_offset_[pid]; /*8k*/ \ + auto size = reducer_offset_offset_[pid + 1]; \ + for (r; r < size; r++) { \ + auto src_offset = reducer_offsets_[r]; /*16k*/ \ + *dst_pid_base = reinterpret_cast<_CTYPE*>(src_addr)[src_offset]; /*64k*/ \ + _mm_prefetch(&(src_addr)[src_offset * sizeof(_CTYPE) + 64], _MM_HINT_T2); \ + dst_pid_base += 1; \ + } \ + } \ + break; +#else #define PROCESS(_CTYPE) \ std::transform(partition_buffer_idx_offset_.begin(), \ partition_buffer_idx_offset_.end(), partition_buffer_idx_base_.begin(), \ @@ -932,9 +999,10 @@ arrow::Status Splitter::SplitFixedWidthValueBuffer(const arrow::RecordBatch& rb) auto dst_pid_base = reinterpret_cast<_CTYPE*>(partition_buffer_idx_offset_[pid]); \ *dst_pid_base = reinterpret_cast<_CTYPE*>(src_addr)[row]; \ partition_buffer_idx_offset_[pid] += sizeof(_CTYPE); \ - _mm_prefetch(&dst_pid_base[1], _MM_HINT_T0); \ + _mm_prefetch(&dst_pid_base[64 / sizeof(_CTYPE)], _MM_HINT_T0); \ } \ break; +#endif case 8: PROCESS(uint8_t) case 16: @@ -942,9 +1010,93 @@ arrow::Status Splitter::SplitFixedWidthValueBuffer(const arrow::RecordBatch& rb) case 32: PROCESS(uint32_t) case 64: +#ifdef PROCESSAVX + std::transform(partition_buffer_idx_offset_.begin(), partition_buffer_idx_offset_.end(), + partition_buffer_idx_base_.begin(), partition_buffer_idx_offset_.begin(), + [](uint8_t* x, int16_t y) { return x+y*sizeof(uint64_t); }); + for (auto pid = 0; pid < num_partitions_; pid++) + { + auto dst_pid_base = reinterpret_cast(partition_buffer_idx_offset_[pid]); /*32k*/ + auto r = reducer_offset_offset_[pid]; /*8k*/ + auto size = reducer_offset_offset_[pid+1]; +#if 1 + for (r; r 0); r++) + { + auto src_offset = reducer_offsets_[r]; /*16k*/ + *dst_pid_base = reinterpret_cast(src_addr)[src_offset]; /*64k*/ + _mm_prefetch(&(src_addr)[src_offset*sizeof(uint64_t)+64], _MM_HINT_T2); + dst_pid_base+=1; + } +#if 0 + for (r; r+4(src_addr)[src_offset]; /*64k*/ + _mm_prefetch(&(src_addr)[src_offset*sizeof(uint64_t)+64], _MM_HINT_T2); + dst_pid_base+=1; + } + } + break; +#else PROCESS(uint64_t) +#endif + #undef PROCESS case 128: // arrow::Decimal128Type::type_id +#ifdef PROCESSROW + // assume batch size = 32k; reducer# = 4K; row/reducer = 8 + std::transform( + partition_buffer_idx_offset_.begin(), partition_buffer_idx_offset_.end(), + partition_buffer_idx_base_.begin(), partition_buffer_idx_offset_.begin(), + [](uint8_t* x, int16_t y) { return x + y * 16; }); + for (auto pid = 0; pid < num_partitions_; pid++) { + auto dst_pid_base = + reinterpret_cast(partition_buffer_idx_offset_[pid]); /*32k*/ + auto r = reducer_offset_offset_[pid]; /*8k*/ + auto size = reducer_offset_offset_[pid + 1]; + for (r; r < size; r++) { + auto src_offset = reducer_offsets_[r]; /*16k*/ + *dst_pid_base = + reinterpret_cast(src_addr)[src_offset << 1]; /*128k*/ + *(dst_pid_base + 1) = + reinterpret_cast(src_addr)[src_offset << 1 | 1]; /*128k*/ + _mm_prefetch(&(src_addr)[src_offset * 16 + 64], _MM_HINT_T2); + dst_pid_base += 2; + } + } + break; +#else std::transform( partition_buffer_idx_offset_.begin(), partition_buffer_idx_offset_.end(), partition_buffer_idx_base_.begin(), partition_buffer_idx_offset_.begin(), @@ -960,6 +1112,7 @@ arrow::Status Splitter::SplitFixedWidthValueBuffer(const arrow::RecordBatch& rb) _MM_HINT_T0); } break; +#endif case 1: // arrow::BooleanType::type_id: partition_buffer_idx_offset.resize(partition_buffer_idx_base_.size()); std::copy(partition_buffer_idx_base_.begin(), partition_buffer_idx_base_.end(), @@ -1159,6 +1312,7 @@ arrow::Status Splitter::SplitFixedWidthValidityBuffer(const arrow::RecordBatch& if (rb.column_data(col_idx)->GetNullCount() == 0 && column_has_null_[col_idx] == true) { // if the input record batch doesn't have null, set validity to True + // column_has_null_ is used to skip the partition_id_cnt_[pid] and dst_addrs[pid] access for (auto pid = 0; pid < num_partitions_; ++pid) { if (partition_id_cnt_[pid] > 0 && dst_addrs[pid] != nullptr) { arrow::BitUtil::SetBitsTo(dst_addrs[pid], partition_buffer_idx_base_[pid], @@ -1406,7 +1560,14 @@ arrow::Status HashSplitter::ComputeAndCountPartitionId(const arrow::RecordBatch& for (auto i = 0; i < num_rows; ++i) { // positive mod auto pid = pid_arr->Value(i) % num_partitions_; - if (pid < 0) pid = (pid + num_partitions_) % num_partitions_; + //force to generate ASM + __asm__ ( + "lea (%[num_partitions],%[pid],1),%[tmp]\n" + "test %[pid],%[pid]\n" + "cmovs %[tmp],%[pid]\n" + : [pid] "+r"(pid) + : [num_partitions]"r"(num_partitions_),[tmp]"r"(0) + ); partition_id_[i] = pid; partition_id_cnt_[pid]++; } diff --git a/native-sql-engine/cpp/src/shuffle/splitter.h b/native-sql-engine/cpp/src/shuffle/splitter.h index 0dfac2f8c..2fb4bb3d4 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.h +++ b/native-sql-engine/cpp/src/shuffle/splitter.h @@ -226,6 +226,10 @@ class Splitter { // updated for each input record batch // col std::vector partition_id_; + // [num_rows] + std::vector reducer_offsets_; + // [num_partitions] + std::vector reducer_offset_offset_; // col std::vector partition_id_cnt_; From 94b733dd463118886e98cfd287a50c14d309f55f Mon Sep 17 00:00:00 2001 From: binwei Date: Sat, 30 Apr 2022 16:42:21 +0800 Subject: [PATCH 2/3] format code --- .../src/benchmarks/shuffle_split_benchmark.cc | 217 +++++++++--------- native-sql-engine/cpp/src/shuffle/splitter.cc | 129 ++++++----- .../src/third_party/parallel_hashmap/btree.h | 21 +- .../src/third_party/parallel_hashmap/phmap.h | 12 +- .../third_party/parallel_hashmap/phmap_base.h | 36 +-- 5 files changed, 204 insertions(+), 211 deletions(-) diff --git a/native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc b/native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc index ec1416641..d2bffe36a 100644 --- a/native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc +++ b/native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc @@ -43,10 +43,8 @@ const int split_buffer_size = 8192; class BenchmarkShuffleSplit { public: - BenchmarkShuffleSplit(std::string file_name) { - GetRecordBatchReader(file_name); - } - + BenchmarkShuffleSplit(std::string file_name) { GetRecordBatchReader(file_name); } + void GetRecordBatchReader(const std::string& input_file) { std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; std::shared_ptr record_batch_reader; @@ -107,8 +105,8 @@ class BenchmarkShuffleSplit { int64_t num_rows = 0; int64_t split_time = 0; - Do_Split(splitter, elapse_read, num_batches, num_rows, split_time, - num_partitions, options, state); + Do_Split(splitter, elapse_read, num_batches, num_rows, split_time, num_partitions, + options, state); auto fs = std::make_shared(); fs->DeleteFile(splitter->DataFile()); @@ -117,56 +115,57 @@ class BenchmarkShuffleSplit { state.counters["rowgroups"] = benchmark::Counter(row_group_indices.size(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); + benchmark::Counter::OneK::kIs1000); state.counters["columns"] = benchmark::Counter(column_indices.size(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); + benchmark::Counter::OneK::kIs1000); state.counters["batches"] = benchmark::Counter( num_batches, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); state.counters["num_rows"] = benchmark::Counter( num_rows, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["num_partitions"] = benchmark::Counter( - num_partitions, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["num_partitions"] = + benchmark::Counter(num_partitions, benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); state.counters["batch_buffer_size"] = benchmark::Counter(batch_buffer_size, benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); + benchmark::Counter::OneK::kIs1024); state.counters["split_buffer_size"] = benchmark::Counter(split_buffer_size, benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); + benchmark::Counter::OneK::kIs1024); state.counters["bytes_spilled"] = benchmark::Counter(splitter->TotalBytesSpilled(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); + benchmark::Counter::OneK::kIs1024); state.counters["bytes_written"] = benchmark::Counter(splitter->TotalBytesWritten(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); + benchmark::Counter::OneK::kIs1024); state.counters["bytes_raw"] = benchmark::Counter(splitter->RawPartitionBytes(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); + benchmark::Counter::OneK::kIs1024); state.counters["bytes_spilled"] = benchmark::Counter(splitter->TotalBytesSpilled(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); + benchmark::Counter::OneK::kIs1024); state.counters["parquet_parse"] = benchmark::Counter( elapse_read, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["compute_pid_time"] = - benchmark::Counter(splitter->TotalComputePidTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); + state.counters["compute_pid_time"] = benchmark::Counter( + splitter->TotalComputePidTime(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); state.counters["write_time"] = benchmark::Counter(splitter->TotalWriteTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); + benchmark::Counter::OneK::kIs1000); state.counters["spill_time"] = benchmark::Counter(splitter->TotalSpillTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); + benchmark::Counter::OneK::kIs1000); state.counters["compress_time"] = benchmark::Counter(splitter->TotalCompressTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); + benchmark::Counter::OneK::kIs1000); - split_time = split_time - splitter->TotalSpillTime() - splitter->TotalComputePidTime() - - splitter->TotalCompressTime() - splitter->TotalWriteTime(); + split_time = split_time - splitter->TotalSpillTime() - + splitter->TotalComputePidTime() - splitter->TotalCompressTime() - + splitter->TotalWriteTime(); state.counters["split_time"] = benchmark::Counter( - split_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - + split_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); } protected: @@ -178,7 +177,8 @@ class BenchmarkShuffleSplit { } virtual void Do_Split(std::shared_ptr& splitter, int64_t& elapse_read, int64_t& num_batches, int64_t& num_rows, int64_t& split_time, - const int num_partitions, SplitOptions options, benchmark::State& state) {} + const int num_partitions, SplitOptions options, + benchmark::State& state) {} protected: std::string file_name; @@ -190,16 +190,15 @@ class BenchmarkShuffleSplit { parquet::ArrowReaderProperties properties; }; +class BenchmarkShuffleSplit_CacheScan_Benchmark : public BenchmarkShuffleSplit { + public: + BenchmarkShuffleSplit_CacheScan_Benchmark(std::string filename) + : BenchmarkShuffleSplit(filename) {} -class BenchmarkShuffleSplit_CacheScan_Benchmark: public BenchmarkShuffleSplit{ -public: -BenchmarkShuffleSplit_CacheScan_Benchmark(std::string filename):BenchmarkShuffleSplit(filename){} - -protected: + protected: void Do_Split(std::shared_ptr& splitter, int64_t& elapse_read, - int64_t& num_batches, int64_t& num_rows, int64_t& split_time, - const int num_partitions, SplitOptions options, benchmark::State& state) { - + int64_t& num_batches, int64_t& num_rows, int64_t& split_time, + const int num_partitions, SplitOptions options, benchmark::State& state) { std::vector local_column_indices; local_column_indices.push_back(0); local_column_indices.push_back(1); @@ -208,7 +207,7 @@ BenchmarkShuffleSplit_CacheScan_Benchmark(std::string filename):BenchmarkShuffle local_column_indices.push_back(5); local_column_indices.push_back(6); local_column_indices.push_back(7); - + std::shared_ptr local_schema; local_schema = std::make_shared(*schema.get()); @@ -222,23 +221,22 @@ BenchmarkShuffleSplit_CacheScan_Benchmark(std::string filename):BenchmarkShuffle ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(8)); ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(3)); - if(state.thread_index() == 0) - std::cout << local_schema->ToString() << std::endl; + if (state.thread_index() == 0) std::cout << local_schema->ToString() << std::endl; + + ARROW_ASSIGN_OR_THROW(splitter, + Splitter::Make("rr", local_schema, num_partitions, options)); - ARROW_ASSIGN_OR_THROW( - splitter, Splitter::Make("rr", local_schema, num_partitions, options)); - std::shared_ptr record_batch; std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; std::shared_ptr record_batch_reader; ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( - arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), properties, - &parquet_reader)); + arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), + properties, &parquet_reader)); std::vector> batches; - ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader(row_group_indices, local_column_indices, - &record_batch_reader)); + ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader( + row_group_indices, local_column_indices, &record_batch_reader)); do { TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); @@ -248,38 +246,36 @@ BenchmarkShuffleSplit_CacheScan_Benchmark(std::string filename):BenchmarkShuffle num_rows += record_batch->num_rows(); } } while (record_batch); - std::cout << "parquet parse done elapsed time " << elapse_read/1000000 << " ms " << std::endl; + std::cout << "parquet parse done elapsed time " << elapse_read / 1000000 << " ms " + << std::endl; std::cout << "batches = " << num_batches << " rows = " << num_rows << std::endl; for (auto _ : state) { - for_each(batches.begin(), batches.end(), - [&splitter, &split_time](std::shared_ptr& record_batch) { - TIME_NANO_OR_THROW(split_time, splitter->Split(*record_batch)); - }); + for_each( + batches.begin(), batches.end(), + [&splitter, &split_time](std::shared_ptr& record_batch) { + TIME_NANO_OR_THROW(split_time, splitter->Split(*record_batch)); + }); } TIME_NANO_OR_THROW(split_time, splitter->Stop()); } - - }; +class BenchmarkShuffleSplit_IterateScan_Benchmark : public BenchmarkShuffleSplit { + public: + BenchmarkShuffleSplit_IterateScan_Benchmark(std::string filename) + : BenchmarkShuffleSplit(filename) {} -class BenchmarkShuffleSplit_IterateScan_Benchmark: public BenchmarkShuffleSplit{ -public: -BenchmarkShuffleSplit_IterateScan_Benchmark(std::string filename):BenchmarkShuffleSplit(filename){} - -protected: + protected: void Do_Split(std::shared_ptr& splitter, int64_t& elapse_read, - int64_t& num_batches, int64_t& num_rows, int64_t& split_time, - const int num_partitions, SplitOptions options, benchmark::State& state) { - - if(state.thread_index() == 0) - std::cout << schema->ToString() << std::endl; + int64_t& num_batches, int64_t& num_rows, int64_t& split_time, + const int num_partitions, SplitOptions options, benchmark::State& state) { + if (state.thread_index() == 0) std::cout << schema->ToString() << std::endl; if (!expr_vector.empty()) { ARROW_ASSIGN_OR_THROW(splitter, Splitter::Make("hash", schema, num_partitions, - expr_vector, std::move(options))); + expr_vector, std::move(options))); } else { ARROW_ASSIGN_OR_THROW( splitter, Splitter::Make("rr", schema, num_partitions, std::move(options))); @@ -290,13 +286,13 @@ BenchmarkShuffleSplit_IterateScan_Benchmark(std::string filename):BenchmarkShuff std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; std::shared_ptr record_batch_reader; ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( - arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), properties, - &parquet_reader)); + arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), + properties, &parquet_reader)); for (auto _ : state) { std::vector> batches; - ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader(row_group_indices, column_indices, - &record_batch_reader)); + ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader( + row_group_indices, column_indices, &record_batch_reader)); TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); while (record_batch) { num_batches += 1; @@ -347,26 +343,20 @@ BenchmarkShuffleSplit_IterateScan_Benchmark(std::string filename):BenchmarkShuff } // namespace sparkcolumnarplugin int main(int argc, char** argv) { - - uint32_t iterations=1; - uint32_t partitions=512; - uint32_t threads=1; + uint32_t iterations = 1; + uint32_t partitions = 512; + uint32_t threads = 1; std::string datafile; - for (int i=0;iIterations(iterations) - ->Args({partitions, arrow::Compression::FASTPFOR}) - ->Threads(threads) - ->ReportAggregatesOnly(false) - ->MeasureProcessCPUTime() - ->Unit(benchmark::kSecond); - -/* sparkcolumnarplugin::shuffle::BenchmarkShuffleSplit_IterateScan_Benchmark bck(datafile); - - benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", bck) - ->Iterations(1) - ->Args({96*2, arrow::Compression::FASTPFOR}) - ->Args({96*4, arrow::Compression::FASTPFOR}) - ->Args({96*8, arrow::Compression::FASTPFOR}) - ->Args({96*16, arrow::Compression::FASTPFOR}) - ->Args({96*32, arrow::Compression::FASTPFOR}) - ->Threads(24) + ->Iterations(iterations) + ->Args({partitions, arrow::Compression::FASTPFOR}) + ->Threads(threads) + ->ReportAggregatesOnly(false) + ->MeasureProcessCPUTime() ->Unit(benchmark::kSecond); - benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", bck) - ->Iterations(1) - ->Args({4096, arrow::Compression::FASTPFOR}) - ->Threads(1) - ->Threads(2) - ->Threads(4) - ->Threads(8) - ->Threads(16) - ->Threads(24) - ->Unit(benchmark::kSecond); -*/ + /* sparkcolumnarplugin::shuffle::BenchmarkShuffleSplit_IterateScan_Benchmark + bck(datafile); + + benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", bck) + ->Iterations(1) + ->Args({96*2, arrow::Compression::FASTPFOR}) + ->Args({96*4, arrow::Compression::FASTPFOR}) + ->Args({96*8, arrow::Compression::FASTPFOR}) + ->Args({96*16, arrow::Compression::FASTPFOR}) + ->Args({96*32, arrow::Compression::FASTPFOR}) + ->Threads(24) + ->Unit(benchmark::kSecond); + + benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", bck) + ->Iterations(1) + ->Args({4096, arrow::Compression::FASTPFOR}) + ->Threads(1) + ->Threads(2) + ->Threads(4) + ->Threads(8) + ->Threads(16) + ->Threads(24) + ->Unit(benchmark::kSecond); + */ benchmark::Initialize(&argc, argv); benchmark::RunSpecifiedBenchmarks(); diff --git a/native-sql-engine/cpp/src/shuffle/splitter.cc b/native-sql-engine/cpp/src/shuffle/splitter.cc index 798668dde..812dc4516 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.cc +++ b/native-sql-engine/cpp/src/shuffle/splitter.cc @@ -25,16 +25,16 @@ #include #include #include +#include -#include -#include -#include #include +#include #include +#include +#include #include "shuffle/utils.h" #include "utils/macros.h" -#include /*#if defined(COLUMNAR_PLUGIN_USE_AVX512) #include @@ -47,24 +47,23 @@ namespace sparkcolumnarplugin { namespace shuffle { using arrow::internal::checked_cast; - - - template std::string __m128i_toString(const __m128i var) { - std::stringstream sstr; - T values[16/sizeof(T)]; - std::memcpy(values,&var,sizeof(values)); //See discussion below - if (sizeof(T) == 1) { - for (unsigned int i = 0; i < sizeof(__m128i); i++) { //C++11: Range for also possible - sstr << std::hex << (int) values[i] << " " << std::dec; - } - } else { - for (unsigned int i = 0; i < sizeof(__m128i) / sizeof(T); i++) { //C++11: Range for also possible - sstr << std::hex << values[i] << " " << std::dec; - } + std::stringstream sstr; + T values[16 / sizeof(T)]; + std::memcpy(values, &var, sizeof(values)); // See discussion below + if (sizeof(T) == 1) { + for (unsigned int i = 0; i < sizeof(__m128i); i++) { // C++11: Range for also + // possible + sstr << std::hex << (int)values[i] << " " << std::dec; + } + } else { + for (unsigned int i = 0; i < sizeof(__m128i) / sizeof(T); + i++) { // C++11: Range for also possible + sstr << std::hex << values[i] << " " << std::dec; } - return sstr.str(); + } + return sstr.str(); } SplitOptions SplitOptions::Defaults() { return SplitOptions(); } @@ -1011,22 +1010,22 @@ arrow::Status Splitter::SplitFixedWidthValueBuffer(const arrow::RecordBatch& rb) PROCESS(uint32_t) case 64: #ifdef PROCESSAVX - std::transform(partition_buffer_idx_offset_.begin(), partition_buffer_idx_offset_.end(), - partition_buffer_idx_base_.begin(), partition_buffer_idx_offset_.begin(), - [](uint8_t* x, int16_t y) { return x+y*sizeof(uint64_t); }); - for (auto pid = 0; pid < num_partitions_; pid++) - { - auto dst_pid_base = reinterpret_cast(partition_buffer_idx_offset_[pid]); /*32k*/ - auto r = reducer_offset_offset_[pid]; /*8k*/ - auto size = reducer_offset_offset_[pid+1]; + std::transform( + partition_buffer_idx_offset_.begin(), partition_buffer_idx_offset_.end(), + partition_buffer_idx_base_.begin(), partition_buffer_idx_offset_.begin(), + [](uint8_t* x, int16_t y) { return x + y * sizeof(uint64_t); }); + for (auto pid = 0; pid < num_partitions_; pid++) { + auto dst_pid_base = + reinterpret_cast(partition_buffer_idx_offset_[pid]); /*32k*/ + auto r = reducer_offset_offset_[pid]; /*8k*/ + auto size = reducer_offset_offset_[pid + 1]; #if 1 - for (r; r 0); r++) - { - auto src_offset = reducer_offsets_[r]; /*16k*/ - *dst_pid_base = reinterpret_cast(src_addr)[src_offset]; /*64k*/ - _mm_prefetch(&(src_addr)[src_offset*sizeof(uint64_t)+64], _MM_HINT_T2); - dst_pid_base+=1; - } + for (r; r < size && (((uint64_t)dst_pid_base & 0x1f) > 0); r++) { + auto src_offset = reducer_offsets_[r]; /*16k*/ + *dst_pid_base = reinterpret_cast(src_addr)[src_offset]; /*64k*/ + _mm_prefetch(&(src_addr)[src_offset * sizeof(uint64_t) + 64], _MM_HINT_T2); + dst_pid_base += 1; + } #if 0 for (r; r+4(src_addr)[src_offset]; /*64k*/ - _mm_prefetch(&(src_addr)[src_offset*sizeof(uint64_t)+64], _MM_HINT_T2); - dst_pid_base+=1; - } - } + for (r; r + 2 < size; r += 2) { + __m128i src_offset_2x = + _mm_cvtsi32_si128(*((int32_t*)(reducer_offsets_.data() + r))); + src_offset_2x = _mm_shufflelo_epi16(src_offset_2x, 0x98); + + __m128i src_2x = + _mm_i32gather_epi64((const long long int*)src_addr, src_offset_2x, 8); + _mm_store_si128((__m128i*)dst_pid_base, src_2x); + //_mm_stream_si128((__m128i*)dst_pid_base,src_2x); + + _mm_prefetch( + &(src_addr)[(uint32_t)reducer_offsets_[r] * sizeof(uint64_t) + 64], + _MM_HINT_T2); + _mm_prefetch( + &(src_addr)[(uint32_t)reducer_offsets_[r + 1] * sizeof(uint64_t) + 64], + _MM_HINT_T2); + dst_pid_base += 2; + } +#endif + for (r; r < size; r++) { + auto src_offset = reducer_offsets_[r]; /*16k*/ + *dst_pid_base = reinterpret_cast(src_addr)[src_offset]; /*64k*/ + _mm_prefetch(&(src_addr)[src_offset * sizeof(uint64_t) + 64], _MM_HINT_T2); + dst_pid_base += 1; + } + } break; #else PROCESS(uint64_t) @@ -1075,7 +1078,7 @@ arrow::Status Splitter::SplitFixedWidthValueBuffer(const arrow::RecordBatch& rb) #undef PROCESS case 128: // arrow::Decimal128Type::type_id #ifdef PROCESSROW - // assume batch size = 32k; reducer# = 4K; row/reducer = 8 + // assume batch size = 32k; reducer# = 4K; row/reducer = 8 std::transform( partition_buffer_idx_offset_.begin(), partition_buffer_idx_offset_.end(), partition_buffer_idx_base_.begin(), partition_buffer_idx_offset_.begin(), @@ -1312,7 +1315,8 @@ arrow::Status Splitter::SplitFixedWidthValidityBuffer(const arrow::RecordBatch& if (rb.column_data(col_idx)->GetNullCount() == 0 && column_has_null_[col_idx] == true) { // if the input record batch doesn't have null, set validity to True - // column_has_null_ is used to skip the partition_id_cnt_[pid] and dst_addrs[pid] access + // column_has_null_ is used to skip the partition_id_cnt_[pid] and dst_addrs[pid] + // access for (auto pid = 0; pid < num_partitions_; ++pid) { if (partition_id_cnt_[pid] > 0 && dst_addrs[pid] != nullptr) { arrow::BitUtil::SetBitsTo(dst_addrs[pid], partition_buffer_idx_base_[pid], @@ -1560,14 +1564,13 @@ arrow::Status HashSplitter::ComputeAndCountPartitionId(const arrow::RecordBatch& for (auto i = 0; i < num_rows; ++i) { // positive mod auto pid = pid_arr->Value(i) % num_partitions_; - //force to generate ASM - __asm__ ( + // force to generate ASM + __asm__( "lea (%[num_partitions],%[pid],1),%[tmp]\n" "test %[pid],%[pid]\n" "cmovs %[tmp],%[pid]\n" : [pid] "+r"(pid) - : [num_partitions]"r"(num_partitions_),[tmp]"r"(0) - ); + : [num_partitions] "r"(num_partitions_), [tmp] "r"(0)); partition_id_[i] = pid; partition_id_cnt_[pid]++; } diff --git a/native-sql-engine/cpp/src/third_party/parallel_hashmap/btree.h b/native-sql-engine/cpp/src/third_party/parallel_hashmap/btree.h index b9b0d94da..24c2d145b 100644 --- a/native-sql-engine/cpp/src/third_party/parallel_hashmap/btree.h +++ b/native-sql-engine/cpp/src/third_party/parallel_hashmap/btree.h @@ -661,9 +661,9 @@ constexpr bool do_less_than_comparison(const Compare& compare, const K& x, const // SFINAE prevents implicit conversions to int (such as from bool). template ::value, int> = 0> constexpr phmap::weak_ordering compare_result_as_ordering(const Int c) { - return c < 0 - ? phmap::weak_ordering::less - : c == 0 ? phmap::weak_ordering::equivalent : phmap::weak_ordering::greater; + return c < 0 ? phmap::weak_ordering::less + : c == 0 ? phmap::weak_ordering::equivalent + : phmap::weak_ordering::greater; } constexpr phmap::weak_ordering compare_result_as_ordering(const phmap::weak_ordering c) { return c; @@ -685,9 +685,9 @@ template < int> = 0> constexpr phmap::weak_ordering do_three_way_comparison(const Compare& compare, const K& x, const LK& y) { - return compare(x, y) ? phmap::weak_ordering::less - : compare(y, x) ? phmap::weak_ordering::greater - : phmap::weak_ordering::equivalent; + return compare(x, y) ? phmap::weak_ordering::less + : compare(y, x) ? phmap::weak_ordering::greater + : phmap::weak_ordering::equivalent; } } // namespace compare_internal @@ -1063,11 +1063,10 @@ class btree_node { // Compute how many values we can fit onto a leaf node taking into account // padding. constexpr static size_type NodeTargetValues(const int begin, const int end) { - return begin == end - ? begin - : SizeWithNValues((begin + end) / 2 + 1) > params_type::kTargetNodeSize - ? NodeTargetValues(begin, (begin + end) / 2) - : NodeTargetValues((begin + end) / 2 + 1, end); + return begin == end ? begin + : SizeWithNValues((begin + end) / 2 + 1) > params_type::kTargetNodeSize + ? NodeTargetValues(begin, (begin + end) / 2) + : NodeTargetValues((begin + end) / 2 + 1, end); } enum { diff --git a/native-sql-engine/cpp/src/third_party/parallel_hashmap/phmap.h b/native-sql-engine/cpp/src/third_party/parallel_hashmap/phmap.h index 4628cca30..05d227a43 100644 --- a/native-sql-engine/cpp/src/third_party/parallel_hashmap/phmap.h +++ b/native-sql-engine/cpp/src/third_party/parallel_hashmap/phmap.h @@ -2156,13 +2156,13 @@ class raw_hash_map : public raw_hash_set { // incomplete types as values, as in unordered_map. // MappedReference<> may be a non-reference type. template - using MappedReference = decltype( - P::value(std::addressof(std::declval()))); + using MappedReference = decltype(P::value( + std::addressof(std::declval()))); // MappedConstReference<> may be a non-reference type. template - using MappedConstReference = decltype( - P::value(std::addressof(std::declval()))); + using MappedConstReference = decltype(P::value( + std::addressof(std::declval()))); using KeyArgImpl = KeyArg::value && IsTransparent::value>; @@ -3409,8 +3409,8 @@ class parallel_hash_map // incomplete types as values, as in unordered_map. // MappedReference<> may be a non-reference type. template - using MappedReference = decltype( - P::value(std::addressof(std::declval()))); + using MappedReference = decltype(P::value( + std::addressof(std::declval()))); // MappedConstReference<> may be a non-reference type. template diff --git a/native-sql-engine/cpp/src/third_party/parallel_hashmap/phmap_base.h b/native-sql-engine/cpp/src/third_party/parallel_hashmap/phmap_base.h index 3b3b6b120..0f4e6375d 100644 --- a/native-sql-engine/cpp/src/third_party/parallel_hashmap/phmap_base.h +++ b/native-sql-engine/cpp/src/third_party/parallel_hashmap/phmap_base.h @@ -826,8 +826,8 @@ struct Invoker { // The result type of Invoke. template -using InvokeT = decltype( - Invoker::type::Invoke(std::declval(), std::declval()...)); +using InvokeT = decltype(Invoker::type::Invoke(std::declval(), + std::declval()...)); // Invoke(f, args...) is an implementation of INVOKE(f, args...) from section // [func.require] of the C++ standard. @@ -1002,9 +1002,10 @@ constexpr T&& forward( namespace utility_internal { // Helper method for expanding tuple into a called method. template -auto apply_helper(Functor&& functor, Tuple&& t, index_sequence) -> decltype( - phmap::base_internal::Invoke(phmap::forward(functor), - std::get(phmap::forward(t))...)) { +auto apply_helper(Functor&& functor, Tuple&& t, index_sequence) + -> decltype(phmap::base_internal::Invoke( + phmap::forward(functor), + std::get(phmap::forward(t))...)) { return phmap::base_internal::Invoke(phmap::forward(functor), std::get(phmap::forward(t))...); } @@ -1887,19 +1888,18 @@ class optional_assign_base { template constexpr copy_traits get_ctor_copy_traits() { - return std::is_copy_constructible::value - ? copy_traits::copyable - : std::is_move_constructible::value ? copy_traits::movable - : copy_traits::non_movable; + return std::is_copy_constructible::value ? copy_traits::copyable + : std::is_move_constructible::value ? copy_traits::movable + : copy_traits::non_movable; } template constexpr copy_traits get_assign_copy_traits() { return phmap::is_copy_assignable::value && std::is_copy_constructible::value ? copy_traits::copyable - : phmap::is_move_assignable::value && std::is_move_constructible::value - ? copy_traits::movable - : copy_traits::non_movable; + : phmap::is_move_assignable::value && std::is_move_constructible::value + ? copy_traits::movable + : copy_traits::non_movable; } // Whether T is constructible or convertible from optional. @@ -2421,9 +2421,9 @@ constexpr optional make_optional(std::initializer_list il, Args&&... args) template constexpr auto operator==(const optional& x, const optional& y) -> decltype(optional_internal::convertible_to_bool(*x == *y)) { - return static_cast(x) != static_cast(y) - ? false - : static_cast(x) == false ? true : static_cast(*x == *y); + return static_cast(x) != static_cast(y) ? false + : static_cast(x) == false ? true + : static_cast(*x == *y); } // Returns: If bool(x) != bool(y), true; otherwise, if bool(x) == false, false; @@ -2431,9 +2431,9 @@ constexpr auto operator==(const optional& x, const optional& y) template constexpr auto operator!=(const optional& x, const optional& y) -> decltype(optional_internal::convertible_to_bool(*x != *y)) { - return static_cast(x) != static_cast(y) - ? true - : static_cast(x) == false ? false : static_cast(*x != *y); + return static_cast(x) != static_cast(y) ? true + : static_cast(x) == false ? false + : static_cast(*x != *y); } // Returns: If !y, false; otherwise, if !x, true; otherwise *x < *y. template From 1b31583dcf3bc6b621decc5470ba7f3acdc3b747 Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Sat, 30 Apr 2022 21:16:49 +0800 Subject: [PATCH 3/3] fix format Signed-off-by: Yuan Zhou --- native-sql-engine/cpp/src/shuffle/splitter.cc | 4 +-- .../src/third_party/parallel_hashmap/btree.h | 21 +++++------ .../src/third_party/parallel_hashmap/phmap.h | 12 +++---- .../third_party/parallel_hashmap/phmap_base.h | 36 +++++++++---------- 4 files changed, 37 insertions(+), 36 deletions(-) diff --git a/native-sql-engine/cpp/src/shuffle/splitter.cc b/native-sql-engine/cpp/src/shuffle/splitter.cc index 812dc4516..7839c4ce4 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.cc +++ b/native-sql-engine/cpp/src/shuffle/splitter.cc @@ -1569,8 +1569,8 @@ arrow::Status HashSplitter::ComputeAndCountPartitionId(const arrow::RecordBatch& "lea (%[num_partitions],%[pid],1),%[tmp]\n" "test %[pid],%[pid]\n" "cmovs %[tmp],%[pid]\n" - : [pid] "+r"(pid) - : [num_partitions] "r"(num_partitions_), [tmp] "r"(0)); + : [ pid ] "+r"(pid) + : [ num_partitions ] "r"(num_partitions_), [ tmp ] "r"(0)); partition_id_[i] = pid; partition_id_cnt_[pid]++; } diff --git a/native-sql-engine/cpp/src/third_party/parallel_hashmap/btree.h b/native-sql-engine/cpp/src/third_party/parallel_hashmap/btree.h index 24c2d145b..b9b0d94da 100644 --- a/native-sql-engine/cpp/src/third_party/parallel_hashmap/btree.h +++ b/native-sql-engine/cpp/src/third_party/parallel_hashmap/btree.h @@ -661,9 +661,9 @@ constexpr bool do_less_than_comparison(const Compare& compare, const K& x, const // SFINAE prevents implicit conversions to int (such as from bool). template ::value, int> = 0> constexpr phmap::weak_ordering compare_result_as_ordering(const Int c) { - return c < 0 ? phmap::weak_ordering::less - : c == 0 ? phmap::weak_ordering::equivalent - : phmap::weak_ordering::greater; + return c < 0 + ? phmap::weak_ordering::less + : c == 0 ? phmap::weak_ordering::equivalent : phmap::weak_ordering::greater; } constexpr phmap::weak_ordering compare_result_as_ordering(const phmap::weak_ordering c) { return c; @@ -685,9 +685,9 @@ template < int> = 0> constexpr phmap::weak_ordering do_three_way_comparison(const Compare& compare, const K& x, const LK& y) { - return compare(x, y) ? phmap::weak_ordering::less - : compare(y, x) ? phmap::weak_ordering::greater - : phmap::weak_ordering::equivalent; + return compare(x, y) ? phmap::weak_ordering::less + : compare(y, x) ? phmap::weak_ordering::greater + : phmap::weak_ordering::equivalent; } } // namespace compare_internal @@ -1063,10 +1063,11 @@ class btree_node { // Compute how many values we can fit onto a leaf node taking into account // padding. constexpr static size_type NodeTargetValues(const int begin, const int end) { - return begin == end ? begin - : SizeWithNValues((begin + end) / 2 + 1) > params_type::kTargetNodeSize - ? NodeTargetValues(begin, (begin + end) / 2) - : NodeTargetValues((begin + end) / 2 + 1, end); + return begin == end + ? begin + : SizeWithNValues((begin + end) / 2 + 1) > params_type::kTargetNodeSize + ? NodeTargetValues(begin, (begin + end) / 2) + : NodeTargetValues((begin + end) / 2 + 1, end); } enum { diff --git a/native-sql-engine/cpp/src/third_party/parallel_hashmap/phmap.h b/native-sql-engine/cpp/src/third_party/parallel_hashmap/phmap.h index 05d227a43..4628cca30 100644 --- a/native-sql-engine/cpp/src/third_party/parallel_hashmap/phmap.h +++ b/native-sql-engine/cpp/src/third_party/parallel_hashmap/phmap.h @@ -2156,13 +2156,13 @@ class raw_hash_map : public raw_hash_set { // incomplete types as values, as in unordered_map. // MappedReference<> may be a non-reference type. template - using MappedReference = decltype(P::value( - std::addressof(std::declval()))); + using MappedReference = decltype( + P::value(std::addressof(std::declval()))); // MappedConstReference<> may be a non-reference type. template - using MappedConstReference = decltype(P::value( - std::addressof(std::declval()))); + using MappedConstReference = decltype( + P::value(std::addressof(std::declval()))); using KeyArgImpl = KeyArg::value && IsTransparent::value>; @@ -3409,8 +3409,8 @@ class parallel_hash_map // incomplete types as values, as in unordered_map. // MappedReference<> may be a non-reference type. template - using MappedReference = decltype(P::value( - std::addressof(std::declval()))); + using MappedReference = decltype( + P::value(std::addressof(std::declval()))); // MappedConstReference<> may be a non-reference type. template diff --git a/native-sql-engine/cpp/src/third_party/parallel_hashmap/phmap_base.h b/native-sql-engine/cpp/src/third_party/parallel_hashmap/phmap_base.h index 0f4e6375d..3b3b6b120 100644 --- a/native-sql-engine/cpp/src/third_party/parallel_hashmap/phmap_base.h +++ b/native-sql-engine/cpp/src/third_party/parallel_hashmap/phmap_base.h @@ -826,8 +826,8 @@ struct Invoker { // The result type of Invoke. template -using InvokeT = decltype(Invoker::type::Invoke(std::declval(), - std::declval()...)); +using InvokeT = decltype( + Invoker::type::Invoke(std::declval(), std::declval()...)); // Invoke(f, args...) is an implementation of INVOKE(f, args...) from section // [func.require] of the C++ standard. @@ -1002,10 +1002,9 @@ constexpr T&& forward( namespace utility_internal { // Helper method for expanding tuple into a called method. template -auto apply_helper(Functor&& functor, Tuple&& t, index_sequence) - -> decltype(phmap::base_internal::Invoke( - phmap::forward(functor), - std::get(phmap::forward(t))...)) { +auto apply_helper(Functor&& functor, Tuple&& t, index_sequence) -> decltype( + phmap::base_internal::Invoke(phmap::forward(functor), + std::get(phmap::forward(t))...)) { return phmap::base_internal::Invoke(phmap::forward(functor), std::get(phmap::forward(t))...); } @@ -1888,18 +1887,19 @@ class optional_assign_base { template constexpr copy_traits get_ctor_copy_traits() { - return std::is_copy_constructible::value ? copy_traits::copyable - : std::is_move_constructible::value ? copy_traits::movable - : copy_traits::non_movable; + return std::is_copy_constructible::value + ? copy_traits::copyable + : std::is_move_constructible::value ? copy_traits::movable + : copy_traits::non_movable; } template constexpr copy_traits get_assign_copy_traits() { return phmap::is_copy_assignable::value && std::is_copy_constructible::value ? copy_traits::copyable - : phmap::is_move_assignable::value && std::is_move_constructible::value - ? copy_traits::movable - : copy_traits::non_movable; + : phmap::is_move_assignable::value && std::is_move_constructible::value + ? copy_traits::movable + : copy_traits::non_movable; } // Whether T is constructible or convertible from optional. @@ -2421,9 +2421,9 @@ constexpr optional make_optional(std::initializer_list il, Args&&... args) template constexpr auto operator==(const optional& x, const optional& y) -> decltype(optional_internal::convertible_to_bool(*x == *y)) { - return static_cast(x) != static_cast(y) ? false - : static_cast(x) == false ? true - : static_cast(*x == *y); + return static_cast(x) != static_cast(y) + ? false + : static_cast(x) == false ? true : static_cast(*x == *y); } // Returns: If bool(x) != bool(y), true; otherwise, if bool(x) == false, false; @@ -2431,9 +2431,9 @@ constexpr auto operator==(const optional& x, const optional& y) template constexpr auto operator!=(const optional& x, const optional& y) -> decltype(optional_internal::convertible_to_bool(*x != *y)) { - return static_cast(x) != static_cast(y) ? true - : static_cast(x) == false ? false - : static_cast(*x != *y); + return static_cast(x) != static_cast(y) + ? true + : static_cast(x) == false ? false : static_cast(*x != *y); } // Returns: If !y, false; otherwise, if !x, true; otherwise *x < *y. template