diff --git a/native-sql-engine/cpp/CMakeLists.txt b/native-sql-engine/cpp/CMakeLists.txt index a1301fd1d..fe7e989ee 100644 --- a/native-sql-engine/cpp/CMakeLists.txt +++ b/native-sql-engine/cpp/CMakeLists.txt @@ -1,6 +1,9 @@ cmake_minimum_required(VERSION 3.16) project(spark_columnar_plugin) +#add_definitions(-DSKIPWRITE -DSKIPCOMPRESS -DPROCESSROW) +add_definitions(-DPROCESSROW) + #add_compile_options(-g) set(CMAKE_EXPORT_COMPILE_COMMANDS ON) diff --git a/native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc b/native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc index cd81ef877..d2bffe36a 100644 --- a/native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc +++ b/native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc @@ -41,24 +41,10 @@ namespace shuffle { const int batch_buffer_size = 32768; const int split_buffer_size = 8192; -class BenchmarkShuffleSplit : public ::benchmark::Fixture { +class BenchmarkShuffleSplit { public: - BenchmarkShuffleSplit() { - file_name = - "/mnt/DP_disk1/lineitem/" - "part-00025-356249a2-c285-42b9-8a18-5b10be61e0c4-c000.snappy.parquet"; + BenchmarkShuffleSplit(std::string file_name) { GetRecordBatchReader(file_name); } - GetRecordBatchReader(file_name); - std::cout << schema->ToString() << std::endl; - const auto& fields = schema->fields(); - for (const auto& field : fields) { - if (field->name() == "l_orderkey") { - auto node = gandiva::TreeExprBuilder::MakeField(field); - expr_vector.push_back(gandiva::TreeExprBuilder::MakeExpression( - std::move(node), arrow::field("res_" + field->name(), field->type()))); - } - } - } void GetRecordBatchReader(const std::string& input_file) { std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; std::shared_ptr record_batch_reader; @@ -89,11 +75,98 @@ class BenchmarkShuffleSplit : public ::benchmark::Fixture { for (int i = 0; i < num_columns; ++i) { column_indices.push_back(i); } + const auto& fields = schema->fields(); + for (const auto& field : fields) { + if (field->name() == "l_orderkey") { + auto node = gandiva::TreeExprBuilder::MakeField(field); + expr_vector.push_back(gandiva::TreeExprBuilder::MakeExpression( + std::move(node), arrow::field("res_" + field->name(), field->type()))); + } + } } - void SetUp(const ::benchmark::State& state) {} - - void TearDown(const ::benchmark::State& state) {} + void operator()(benchmark::State& state) { + SetCPU(state.thread_index()); + arrow::Compression::type compression_type = (arrow::Compression::type)state.range(1); + + const int num_partitions = state.range(0); + + auto options = SplitOptions::Defaults(); + options.compression_type = compression_type; + options.buffer_size = split_buffer_size; + options.buffered_write = true; + options.offheap_per_task = 128 * 1024 * 1024 * 1024L; + options.prefer_spill = true; + options.write_schema = false; + + std::shared_ptr splitter; + int64_t elapse_read = 0; + int64_t num_batches = 0; + int64_t num_rows = 0; + int64_t split_time = 0; + + Do_Split(splitter, elapse_read, num_batches, num_rows, split_time, num_partitions, + options, state); + + auto fs = std::make_shared(); + fs->DeleteFile(splitter->DataFile()); + + state.SetBytesProcessed(int64_t(splitter->RawPartitionBytes())); + + state.counters["rowgroups"] = + benchmark::Counter(row_group_indices.size(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["columns"] = + benchmark::Counter(column_indices.size(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["batches"] = benchmark::Counter( + num_batches, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["num_rows"] = benchmark::Counter( + num_rows, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["num_partitions"] = + benchmark::Counter(num_partitions, benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["batch_buffer_size"] = + benchmark::Counter(batch_buffer_size, benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + state.counters["split_buffer_size"] = + benchmark::Counter(split_buffer_size, benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + + state.counters["bytes_spilled"] = + benchmark::Counter(splitter->TotalBytesSpilled(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + state.counters["bytes_written"] = + benchmark::Counter(splitter->TotalBytesWritten(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + state.counters["bytes_raw"] = + benchmark::Counter(splitter->RawPartitionBytes(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + state.counters["bytes_spilled"] = + benchmark::Counter(splitter->TotalBytesSpilled(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + + state.counters["parquet_parse"] = benchmark::Counter( + elapse_read, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["compute_pid_time"] = benchmark::Counter( + splitter->TotalComputePidTime(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["write_time"] = + benchmark::Counter(splitter->TotalWriteTime(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["spill_time"] = + benchmark::Counter(splitter->TotalSpillTime(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["compress_time"] = + benchmark::Counter(splitter->TotalCompressTime(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + + split_time = split_time - splitter->TotalSpillTime() - + splitter->TotalComputePidTime() - splitter->TotalCompressTime() - + splitter->TotalWriteTime(); + state.counters["split_time"] = benchmark::Counter( + split_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + } protected: long SetCPU(uint32_t cpuindex) { @@ -102,8 +175,9 @@ class BenchmarkShuffleSplit : public ::benchmark::Fixture { CPU_SET(cpuindex, &cs); return sched_setaffinity(0, sizeof(cs), &cs); } - virtual void Do_Split(const std::shared_ptr& splitter, int64_t& elapse_read, + virtual void Do_Split(std::shared_ptr& splitter, int64_t& elapse_read, int64_t& num_batches, int64_t& num_rows, int64_t& split_time, + const int num_partitions, SplitOptions options, benchmark::State& state) {} protected: @@ -116,232 +190,120 @@ class BenchmarkShuffleSplit : public ::benchmark::Fixture { parquet::ArrowReaderProperties properties; }; -BENCHMARK_DEFINE_F(BenchmarkShuffleSplit, CacheScan)(benchmark::State& state) { - SetCPU(state.thread_index()); - - arrow::Compression::type compression_type = (arrow::Compression::type)state.range(1); - - const int num_partitions = state.range(0); +class BenchmarkShuffleSplit_CacheScan_Benchmark : public BenchmarkShuffleSplit { + public: + BenchmarkShuffleSplit_CacheScan_Benchmark(std::string filename) + : BenchmarkShuffleSplit(filename) {} - auto options = SplitOptions::Defaults(); - options.compression_type = compression_type; - options.buffer_size = split_buffer_size; - options.buffered_write = true; - options.offheap_per_task = 128 * 1024 * 1024 * 1024L; - options.prefer_spill = true; - options.write_schema = false; + protected: + void Do_Split(std::shared_ptr& splitter, int64_t& elapse_read, + int64_t& num_batches, int64_t& num_rows, int64_t& split_time, + const int num_partitions, SplitOptions options, benchmark::State& state) { + std::vector local_column_indices; + local_column_indices.push_back(0); + local_column_indices.push_back(1); + local_column_indices.push_back(2); + local_column_indices.push_back(4); + local_column_indices.push_back(5); + local_column_indices.push_back(6); + local_column_indices.push_back(7); + + std::shared_ptr local_schema; + local_schema = std::make_shared(*schema.get()); + + ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(15)); + ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(14)); + ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(13)); + ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(12)); + ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(11)); + ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(10)); + ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(9)); + ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(8)); + ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(3)); + + if (state.thread_index() == 0) std::cout << local_schema->ToString() << std::endl; + + ARROW_ASSIGN_OR_THROW(splitter, + Splitter::Make("rr", local_schema, num_partitions, options)); + + std::shared_ptr record_batch; - std::shared_ptr splitter; + std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; + std::shared_ptr record_batch_reader; + ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( + arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), + properties, &parquet_reader)); - if (!expr_vector.empty()) { - ARROW_ASSIGN_OR_THROW(splitter, Splitter::Make("hash", schema, num_partitions, - expr_vector, std::move(options))); - } else { - ARROW_ASSIGN_OR_THROW( - splitter, Splitter::Make("rr", schema, num_partitions, std::move(options))); - } + std::vector> batches; + ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader( + row_group_indices, local_column_indices, &record_batch_reader)); + do { + TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); - std::shared_ptr record_batch; - int64_t elapse_read = 0; - int64_t num_batches = 0; - int64_t num_rows = 0; - int64_t split_time = 0; - - std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; - std::shared_ptr record_batch_reader; - ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( - arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), properties, - &parquet_reader)); - - std::vector> batches; - ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader(row_group_indices, column_indices, - &record_batch_reader)); - do { - TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); - - if (record_batch) { - batches.push_back(record_batch); - num_batches += 1; - num_rows += record_batch->num_rows(); + if (record_batch) { + batches.push_back(record_batch); + num_batches += 1; + num_rows += record_batch->num_rows(); + } + } while (record_batch); + std::cout << "parquet parse done elapsed time " << elapse_read / 1000000 << " ms " + << std::endl; + std::cout << "batches = " << num_batches << " rows = " << num_rows << std::endl; + + for (auto _ : state) { + for_each( + batches.begin(), batches.end(), + [&splitter, &split_time](std::shared_ptr& record_batch) { + TIME_NANO_OR_THROW(split_time, splitter->Split(*record_batch)); + }); } - } while (record_batch); - for (auto _ : state) { - for_each(batches.begin(), batches.end(), - [&splitter, &split_time](std::shared_ptr& record_batch) { - TIME_NANO_OR_THROW(split_time, splitter->Split(*record_batch)); - }); + TIME_NANO_OR_THROW(split_time, splitter->Stop()); } +}; - TIME_NANO_OR_THROW(split_time, splitter->Stop()); - - auto fs = std::make_shared(); - fs->DeleteFile(splitter->DataFile()); - - state.SetBytesProcessed(int64_t(splitter->RawPartitionBytes())); - - state.counters["rowgroups"] = - benchmark::Counter(row_group_indices.size(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["columns"] = - benchmark::Counter(column_indices.size(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["batches"] = benchmark::Counter( - num_batches, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["num_rows"] = benchmark::Counter( - num_rows, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["num_partitions"] = benchmark::Counter( - num_partitions, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["batch_buffer_size"] = - benchmark::Counter(batch_buffer_size, benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["split_buffer_size"] = - benchmark::Counter(split_buffer_size, benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - - state.counters["bytes_spilled"] = - benchmark::Counter(splitter->TotalBytesSpilled(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["bytes_written"] = - benchmark::Counter(splitter->TotalBytesWritten(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["bytes_raw"] = - benchmark::Counter(splitter->RawPartitionBytes(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["bytes_spilled"] = - benchmark::Counter(splitter->TotalBytesSpilled(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - - state.counters["parquet_parse"] = benchmark::Counter( - elapse_read, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["compute_pid_time"] = - benchmark::Counter(splitter->TotalComputePidTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["write_time"] = - benchmark::Counter(splitter->TotalWriteTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["spill_time"] = - benchmark::Counter(splitter->TotalSpillTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["compress_time"] = - benchmark::Counter(splitter->TotalCompressTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - - split_time = split_time - splitter->TotalSpillTime() - splitter->TotalComputePidTime() - - splitter->TotalCompressTime() - splitter->TotalWriteTime(); - state.counters["split_time"] = benchmark::Counter( - split_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); -} - -BENCHMARK_DEFINE_F(BenchmarkShuffleSplit, IterateScan)(benchmark::State& state) { - SetCPU(state.thread_index()); - - arrow::Compression::type compression_type = (arrow::Compression::type)state.range(1); - - const int num_partitions = state.range(0); - - auto options = SplitOptions::Defaults(); - options.compression_type = compression_type; - options.buffer_size = split_buffer_size; - options.buffered_write = true; - options.offheap_per_task = 128 * 1024 * 1024 * 1024L; - options.prefer_spill = true; - options.write_schema = false; - - std::shared_ptr splitter; - - if (!expr_vector.empty()) { - ARROW_ASSIGN_OR_THROW(splitter, Splitter::Make("hash", schema, num_partitions, - expr_vector, std::move(options))); - } else { - ARROW_ASSIGN_OR_THROW( - splitter, Splitter::Make("rr", schema, num_partitions, std::move(options))); - } +class BenchmarkShuffleSplit_IterateScan_Benchmark : public BenchmarkShuffleSplit { + public: + BenchmarkShuffleSplit_IterateScan_Benchmark(std::string filename) + : BenchmarkShuffleSplit(filename) {} - int64_t elapse_read = 0; - int64_t num_batches = 0; - int64_t num_rows = 0; - int64_t split_time = 0; + protected: + void Do_Split(std::shared_ptr& splitter, int64_t& elapse_read, + int64_t& num_batches, int64_t& num_rows, int64_t& split_time, + const int num_partitions, SplitOptions options, benchmark::State& state) { + if (state.thread_index() == 0) std::cout << schema->ToString() << std::endl; + + if (!expr_vector.empty()) { + ARROW_ASSIGN_OR_THROW(splitter, Splitter::Make("hash", schema, num_partitions, + expr_vector, std::move(options))); + } else { + ARROW_ASSIGN_OR_THROW( + splitter, Splitter::Make("rr", schema, num_partitions, std::move(options))); + } - std::shared_ptr record_batch; + std::shared_ptr record_batch; - std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; - std::shared_ptr record_batch_reader; - ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( - arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), properties, - &parquet_reader)); + std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; + std::shared_ptr record_batch_reader; + ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( + arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), + properties, &parquet_reader)); - for (auto _ : state) { - std::vector> batches; - ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader(row_group_indices, column_indices, - &record_batch_reader)); - TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); - while (record_batch) { - num_batches += 1; - num_rows += record_batch->num_rows(); - TIME_NANO_OR_THROW(split_time, splitter->Split(*record_batch)); + for (auto _ : state) { + std::vector> batches; + ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader( + row_group_indices, column_indices, &record_batch_reader)); TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); + while (record_batch) { + num_batches += 1; + num_rows += record_batch->num_rows(); + TIME_NANO_OR_THROW(split_time, splitter->Split(*record_batch)); + TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); + } } + TIME_NANO_OR_THROW(split_time, splitter->Stop()); } - TIME_NANO_OR_THROW(split_time, splitter->Stop()); - - auto fs = std::make_shared(); - fs->DeleteFile(splitter->DataFile()); - - state.SetBytesProcessed(int64_t(splitter->RawPartitionBytes())); - - state.counters["rowgroups"] = - benchmark::Counter(row_group_indices.size(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["columns"] = - benchmark::Counter(column_indices.size(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["batches"] = benchmark::Counter( - num_batches, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["num_rows"] = benchmark::Counter( - num_rows, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["num_partitions"] = benchmark::Counter( - num_partitions, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["batch_buffer_size"] = - benchmark::Counter(batch_buffer_size, benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["split_buffer_size"] = - benchmark::Counter(split_buffer_size, benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - - state.counters["bytes_spilled"] = - benchmark::Counter(splitter->TotalBytesSpilled(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["bytes_written"] = - benchmark::Counter(splitter->TotalBytesWritten(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["bytes_raw"] = - benchmark::Counter(splitter->RawPartitionBytes(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["bytes_spilled"] = - benchmark::Counter(splitter->TotalBytesSpilled(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - - state.counters["parquet_parse"] = benchmark::Counter( - elapse_read, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["compute_pid_time"] = - benchmark::Counter(splitter->TotalComputePidTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["write_time"] = - benchmark::Counter(splitter->TotalWriteTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["spill_time"] = - benchmark::Counter(splitter->TotalSpillTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["compress_time"] = - benchmark::Counter(splitter->TotalCompressTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - - split_time = split_time - splitter->TotalSpillTime() - splitter->TotalComputePidTime() - - splitter->TotalCompressTime() - splitter->TotalWriteTime(); - state.counters["split_time"] = benchmark::Counter( - split_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); -} +}; /*BENCHMARK_REGISTER_F(BenchmarkShuffleSplit, CacheScan)->Iterations(1) ->Args({96*2, arrow::Compression::FASTPFOR}) @@ -370,14 +332,74 @@ BENCHMARK_DEFINE_F(BenchmarkShuffleSplit, IterateScan)(benchmark::State& state) ->Threads(16) ->Threads(24) ->Unit(benchmark::kSecond);*/ -BENCHMARK_REGISTER_F(BenchmarkShuffleSplit, IterateScan) - ->Iterations(1) - ->Args({96 * 16, arrow::Compression::FASTPFOR}) - ->Threads(24) +/*BENCHMARK_REGISTER_F(BenchmarkShuffleSplit, CacheScan) + ->Iterations(1000000) + ->Args({512, arrow::Compression::FASTPFOR}) + ->Threads(1) ->ReportAggregatesOnly(false) ->MeasureProcessCPUTime() - ->Unit(benchmark::kSecond); + ->Unit(benchmark::kSecond);*/ } // namespace shuffle } // namespace sparkcolumnarplugin -BENCHMARK_MAIN(); +int main(int argc, char** argv) { + uint32_t iterations = 1; + uint32_t partitions = 512; + uint32_t threads = 1; + std::string datafile; + + for (int i = 0; i < argc; i++) { + if (strcmp(argv[i], "--iterations") == 0) { + iterations = atol(argv[i + 1]); + } else if (strcmp(argv[i], "--partitions") == 0) { + partitions = atol(argv[i + 1]); + } else if (strcmp(argv[i], "--threads") == 0) { + threads = atol(argv[i + 1]); + } else if (strcmp(argv[i], "--file") == 0) { + datafile = argv[i + 1]; + } + } + std::cout << "iterations = " << iterations << std::endl; + std::cout << "partitions = " << partitions << std::endl; + std::cout << "threads = " << threads << std::endl; + std::cout << "datafile = " << datafile << std::endl; + + sparkcolumnarplugin::shuffle::BenchmarkShuffleSplit_CacheScan_Benchmark bck(datafile); + + benchmark::RegisterBenchmark("BenchmarkShuffleSplit::CacheScan", bck) + ->Iterations(iterations) + ->Args({partitions, arrow::Compression::FASTPFOR}) + ->Threads(threads) + ->ReportAggregatesOnly(false) + ->MeasureProcessCPUTime() + ->Unit(benchmark::kSecond); + + /* sparkcolumnarplugin::shuffle::BenchmarkShuffleSplit_IterateScan_Benchmark + bck(datafile); + + benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", bck) + ->Iterations(1) + ->Args({96*2, arrow::Compression::FASTPFOR}) + ->Args({96*4, arrow::Compression::FASTPFOR}) + ->Args({96*8, arrow::Compression::FASTPFOR}) + ->Args({96*16, arrow::Compression::FASTPFOR}) + ->Args({96*32, arrow::Compression::FASTPFOR}) + ->Threads(24) + ->Unit(benchmark::kSecond); + + benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", bck) + ->Iterations(1) + ->Args({4096, arrow::Compression::FASTPFOR}) + ->Threads(1) + ->Threads(2) + ->Threads(4) + ->Threads(8) + ->Threads(16) + ->Threads(24) + ->Unit(benchmark::kSecond); + */ + + benchmark::Initialize(&argc, argv); + benchmark::RunSpecifiedBenchmarks(); + benchmark::Shutdown(); +} \ No newline at end of file diff --git a/native-sql-engine/cpp/src/shuffle/splitter.cc b/native-sql-engine/cpp/src/shuffle/splitter.cc index e739bd04f..7839c4ce4 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.cc +++ b/native-sql-engine/cpp/src/shuffle/splitter.cc @@ -25,23 +25,47 @@ #include #include #include +#include +#include #include +#include +#include #include #include "shuffle/utils.h" #include "utils/macros.h" -#if defined(COLUMNAR_PLUGIN_USE_AVX512) +/*#if defined(COLUMNAR_PLUGIN_USE_AVX512) #include #else #include #endif +*/ namespace sparkcolumnarplugin { namespace shuffle { using arrow::internal::checked_cast; +template +std::string __m128i_toString(const __m128i var) { + std::stringstream sstr; + T values[16 / sizeof(T)]; + std::memcpy(values, &var, sizeof(values)); // See discussion below + if (sizeof(T) == 1) { + for (unsigned int i = 0; i < sizeof(__m128i); i++) { // C++11: Range for also + // possible + sstr << std::hex << (int)values[i] << " " << std::dec; + } + } else { + for (unsigned int i = 0; i < sizeof(__m128i) / sizeof(T); + i++) { // C++11: Range for also possible + sstr << std::hex << values[i] << " " << std::dec; + } + } + return sstr.str(); +} + SplitOptions SplitOptions::Defaults() { return SplitOptions(); } #if defined(COLUMNAR_PLUGIN_USE_AVX512) inline __m256i CountPartitionIdOccurrence(const std::vector& partition_id, @@ -293,6 +317,7 @@ arrow::Status Splitter::Init() { partition_cached_recordbatch_size_.resize(num_partitions_); partition_lengths_.resize(num_partitions_); raw_partition_lengths_.resize(num_partitions_); + reducer_offset_offset_.resize(num_partitions_ + 1); for (int i = 0; i < column_type_id_.size(); ++i) { switch (column_type_id_[i]->id()) { @@ -815,6 +840,26 @@ arrow::Result Splitter::SpillLargestPartition(int64_t* size) { } arrow::Status Splitter::DoSplit(const arrow::RecordBatch& rb) { +#ifdef PROCESSROW + + reducer_offsets_.resize(rb.num_rows()); + + reducer_offset_offset_[0] = 0; + for (auto pid = 1; pid <= num_partitions_; pid++) { + reducer_offset_offset_[pid] = + reducer_offset_offset_[pid - 1] + partition_id_cnt_[pid - 1]; + } + for (auto row = 0; row < rb.num_rows(); row++) { + auto pid = partition_id_[row]; + reducer_offsets_[reducer_offset_offset_[pid]] = row; + _mm_prefetch(reducer_offsets_.data() + reducer_offset_offset_[pid] + 32, _MM_HINT_T0); + reducer_offset_offset_[pid]++; + } + std::transform(reducer_offset_offset_.begin(), std::prev(reducer_offset_offset_.end()), + partition_id_cnt_.begin(), reducer_offset_offset_.begin(), + [](uint16_t x, int16_t y) { return x - y; }); + +#endif // for the first input record batch, scan binary arrays and large binary // arrays to get their empirical sizes @@ -922,6 +967,27 @@ arrow::Status Splitter::SplitFixedWidthValueBuffer(const arrow::RecordBatch& rb) auto src_addr = const_cast(rb.column_data(col_idx)->buffers[1]->data()); switch (arrow::bit_width(column_type_id_[col_idx]->id())) { +#ifdef PROCESSROW +// assume batch size = 32k; reducer# = 4K; row/reducer = 8 +#define PROCESS(_CTYPE) \ + std::transform(partition_buffer_idx_offset_.begin(), \ + partition_buffer_idx_offset_.end(), partition_buffer_idx_base_.begin(), \ + partition_buffer_idx_offset_.begin(), \ + [](uint8_t* x, int16_t y) { return x + y * sizeof(_CTYPE); }); \ + for (auto pid = 0; pid < num_partitions_; pid++) { \ + auto dst_pid_base = \ + reinterpret_cast<_CTYPE*>(partition_buffer_idx_offset_[pid]); /*32k*/ \ + auto r = reducer_offset_offset_[pid]; /*8k*/ \ + auto size = reducer_offset_offset_[pid + 1]; \ + for (r; r < size; r++) { \ + auto src_offset = reducer_offsets_[r]; /*16k*/ \ + *dst_pid_base = reinterpret_cast<_CTYPE*>(src_addr)[src_offset]; /*64k*/ \ + _mm_prefetch(&(src_addr)[src_offset * sizeof(_CTYPE) + 64], _MM_HINT_T2); \ + dst_pid_base += 1; \ + } \ + } \ + break; +#else #define PROCESS(_CTYPE) \ std::transform(partition_buffer_idx_offset_.begin(), \ partition_buffer_idx_offset_.end(), partition_buffer_idx_base_.begin(), \ @@ -932,9 +998,10 @@ arrow::Status Splitter::SplitFixedWidthValueBuffer(const arrow::RecordBatch& rb) auto dst_pid_base = reinterpret_cast<_CTYPE*>(partition_buffer_idx_offset_[pid]); \ *dst_pid_base = reinterpret_cast<_CTYPE*>(src_addr)[row]; \ partition_buffer_idx_offset_[pid] += sizeof(_CTYPE); \ - _mm_prefetch(&dst_pid_base[1], _MM_HINT_T0); \ + _mm_prefetch(&dst_pid_base[64 / sizeof(_CTYPE)], _MM_HINT_T0); \ } \ break; +#endif case 8: PROCESS(uint8_t) case 16: @@ -942,9 +1009,97 @@ arrow::Status Splitter::SplitFixedWidthValueBuffer(const arrow::RecordBatch& rb) case 32: PROCESS(uint32_t) case 64: +#ifdef PROCESSAVX + std::transform( + partition_buffer_idx_offset_.begin(), partition_buffer_idx_offset_.end(), + partition_buffer_idx_base_.begin(), partition_buffer_idx_offset_.begin(), + [](uint8_t* x, int16_t y) { return x + y * sizeof(uint64_t); }); + for (auto pid = 0; pid < num_partitions_; pid++) { + auto dst_pid_base = + reinterpret_cast(partition_buffer_idx_offset_[pid]); /*32k*/ + auto r = reducer_offset_offset_[pid]; /*8k*/ + auto size = reducer_offset_offset_[pid + 1]; +#if 1 + for (r; r < size && (((uint64_t)dst_pid_base & 0x1f) > 0); r++) { + auto src_offset = reducer_offsets_[r]; /*16k*/ + *dst_pid_base = reinterpret_cast(src_addr)[src_offset]; /*64k*/ + _mm_prefetch(&(src_addr)[src_offset * sizeof(uint64_t) + 64], _MM_HINT_T2); + dst_pid_base += 1; + } +#if 0 + for (r; r+4(src_addr)[src_offset]; /*64k*/ + _mm_prefetch(&(src_addr)[src_offset * sizeof(uint64_t) + 64], _MM_HINT_T2); + dst_pid_base += 1; + } + } + break; +#else PROCESS(uint64_t) +#endif + #undef PROCESS case 128: // arrow::Decimal128Type::type_id +#ifdef PROCESSROW + // assume batch size = 32k; reducer# = 4K; row/reducer = 8 + std::transform( + partition_buffer_idx_offset_.begin(), partition_buffer_idx_offset_.end(), + partition_buffer_idx_base_.begin(), partition_buffer_idx_offset_.begin(), + [](uint8_t* x, int16_t y) { return x + y * 16; }); + for (auto pid = 0; pid < num_partitions_; pid++) { + auto dst_pid_base = + reinterpret_cast(partition_buffer_idx_offset_[pid]); /*32k*/ + auto r = reducer_offset_offset_[pid]; /*8k*/ + auto size = reducer_offset_offset_[pid + 1]; + for (r; r < size; r++) { + auto src_offset = reducer_offsets_[r]; /*16k*/ + *dst_pid_base = + reinterpret_cast(src_addr)[src_offset << 1]; /*128k*/ + *(dst_pid_base + 1) = + reinterpret_cast(src_addr)[src_offset << 1 | 1]; /*128k*/ + _mm_prefetch(&(src_addr)[src_offset * 16 + 64], _MM_HINT_T2); + dst_pid_base += 2; + } + } + break; +#else std::transform( partition_buffer_idx_offset_.begin(), partition_buffer_idx_offset_.end(), partition_buffer_idx_base_.begin(), partition_buffer_idx_offset_.begin(), @@ -960,6 +1115,7 @@ arrow::Status Splitter::SplitFixedWidthValueBuffer(const arrow::RecordBatch& rb) _MM_HINT_T0); } break; +#endif case 1: // arrow::BooleanType::type_id: partition_buffer_idx_offset.resize(partition_buffer_idx_base_.size()); std::copy(partition_buffer_idx_base_.begin(), partition_buffer_idx_base_.end(), @@ -1159,6 +1315,8 @@ arrow::Status Splitter::SplitFixedWidthValidityBuffer(const arrow::RecordBatch& if (rb.column_data(col_idx)->GetNullCount() == 0 && column_has_null_[col_idx] == true) { // if the input record batch doesn't have null, set validity to True + // column_has_null_ is used to skip the partition_id_cnt_[pid] and dst_addrs[pid] + // access for (auto pid = 0; pid < num_partitions_; ++pid) { if (partition_id_cnt_[pid] > 0 && dst_addrs[pid] != nullptr) { arrow::BitUtil::SetBitsTo(dst_addrs[pid], partition_buffer_idx_base_[pid], @@ -1406,7 +1564,13 @@ arrow::Status HashSplitter::ComputeAndCountPartitionId(const arrow::RecordBatch& for (auto i = 0; i < num_rows; ++i) { // positive mod auto pid = pid_arr->Value(i) % num_partitions_; - if (pid < 0) pid = (pid + num_partitions_) % num_partitions_; + // force to generate ASM + __asm__( + "lea (%[num_partitions],%[pid],1),%[tmp]\n" + "test %[pid],%[pid]\n" + "cmovs %[tmp],%[pid]\n" + : [ pid ] "+r"(pid) + : [ num_partitions ] "r"(num_partitions_), [ tmp ] "r"(0)); partition_id_[i] = pid; partition_id_cnt_[pid]++; } diff --git a/native-sql-engine/cpp/src/shuffle/splitter.h b/native-sql-engine/cpp/src/shuffle/splitter.h index 0dfac2f8c..2fb4bb3d4 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.h +++ b/native-sql-engine/cpp/src/shuffle/splitter.h @@ -226,6 +226,10 @@ class Splitter { // updated for each input record batch // col std::vector partition_id_; + // [num_rows] + std::vector reducer_offsets_; + // [num_partitions] + std::vector reducer_offset_offset_; // col std::vector partition_id_cnt_;