From 4c3ac7306a2a4e04604948676c04b89e5bf4421f Mon Sep 17 00:00:00 2001 From: Orri Erling Date: Wed, 14 Dec 2022 18:43:47 -0800 Subject: [PATCH] Add cache and read-ahead to velox_tpch_benchmark (#3519) Summary: Adds options for repetition and use of cache and read-ahead to velox_tpch_benchmark. Pull Request resolved: https://github.com/facebookincubator/velox/pull/3519 Reviewed By: kgpai Differential Revision: D42053362 Pulled By: oerling fbshipit-source-id: 66c64729598e1607704012f6d73b95b5d5ae7a63 --- velox/benchmarks/tpch/TpchBenchmark.cpp | 79 +++++++++++++++++++------ 1 file changed, 60 insertions(+), 19 deletions(-) diff --git a/velox/benchmarks/tpch/TpchBenchmark.cpp b/velox/benchmarks/tpch/TpchBenchmark.cpp index 9423924b1713..4a4c875e7b59 100644 --- a/velox/benchmarks/tpch/TpchBenchmark.cpp +++ b/velox/benchmarks/tpch/TpchBenchmark.cpp @@ -20,6 +20,7 @@ #include "velox/common/base/SuccinctPrinter.h" #include "velox/common/file/FileSystems.h" +#include "velox/common/memory/MmapAllocator.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/dwio/common/Options.h" #include "velox/dwio/dwrf/reader/DwrfReader.h" @@ -90,6 +91,12 @@ DEFINE_bool(use_native_parquet_reader, true, "Use Native Parquet Reader"); DEFINE_int32(num_drivers, 4, "Number of drivers"); DEFINE_string(data_format, "parquet", "Data format"); DEFINE_int32(num_splits_per_file, 10, "Number of splits per file"); +DEFINE_int32( + cache_gb, + 0, + "GB of process memory for cache and query.. if " + "non-0, uses mmap to allocator and in-process data cache."); +DEFINE_int32(num_repeats, 1, "Number of times to run each query"); DEFINE_validator(data_path, ¬Empty); DEFINE_validator(data_format, &validateDataFormat); @@ -97,6 +104,18 @@ DEFINE_validator(data_format, &validateDataFormat); class TpchBenchmark { public: void initialize() { + if (FLAGS_cache_gb) { + int64_t memoryBytes = FLAGS_cache_gb * (1LL << 30); + memory::MmapAllocatorOptions options; + options.capacity = memoryBytes; + options.useMmapArena = true; + options.mmapArenaCapacityRatio = 1; + + auto allocator = std::make_shared(options); + allocator_ = std::make_shared( + allocator, memoryBytes, nullptr); + memory::MemoryAllocator::setDefaultInstance(allocator_.get()); + } functions::prestosql::registerAllScalarFunctions(); aggregate::prestosql::registerAllAggregateFunctions(); parse::registerTypeResolver(); @@ -107,38 +126,56 @@ class TpchBenchmark { parquet::registerParquetReaderFactory(parquet::ParquetReaderType::DUCKDB); } dwrf::registerDwrfReaderFactory(); + ioExecutor_ = std::make_unique(8); + auto hiveConnector = connector::getConnectorFactory( connector::hive::HiveConnectorFactory::kHiveConnectorName) - ->newConnector(kHiveConnectorId, nullptr); + ->newConnector(kHiveConnectorId, nullptr, ioExecutor_.get()); connector::registerConnector(hiveConnector); } std::pair, std::vector> run( const TpchPlan& tpchPlan) { - CursorParameters params; - params.maxDrivers = FLAGS_num_drivers; - params.planNode = tpchPlan.plan; - const int numSplitsPerFile = FLAGS_num_splits_per_file; - - bool noMoreSplits = false; - auto addSplits = [&](exec::Task* task) { - if (!noMoreSplits) { - for (const auto& entry : tpchPlan.dataFiles) { - for (const auto& path : entry.second) { - auto const splits = HiveConnectorTestBase::makeHiveConnectorSplits( - path, numSplitsPerFile, tpchPlan.dataFileFormat); - for (const auto& split : splits) { - task->addSplit(entry.first, exec::Split(split)); + int32_t repeat = 0; + try { + for (;;) { + CursorParameters params; + params.maxDrivers = FLAGS_num_drivers; + params.planNode = tpchPlan.plan; + const int numSplitsPerFile = FLAGS_num_splits_per_file; + + bool noMoreSplits = false; + auto addSplits = [&](exec::Task* task) { + if (!noMoreSplits) { + for (const auto& entry : tpchPlan.dataFiles) { + for (const auto& path : entry.second) { + auto const splits = + HiveConnectorTestBase::makeHiveConnectorSplits( + path, numSplitsPerFile, tpchPlan.dataFileFormat); + for (const auto& split : splits) { + task->addSplit(entry.first, exec::Split(split)); + } + } + task->noMoreSplits(entry.first); } } - task->noMoreSplits(entry.first); + noMoreSplits = true; + }; + auto result = readCursor(params, addSplits); + ensureTaskCompletion(result.first->task().get()); + if (++repeat >= FLAGS_num_repeats) { + return result; } } - noMoreSplits = true; - }; - return readCursor(params, addSplits); + } catch (const std::exception& e) { + LOG(ERROR) << "Query terminated with: " << e.what(); + return {nullptr, {}}; + } } + + std::unique_ptr ioExecutor_; + std::shared_ptr allocator_; }; TpchBenchmark benchmark; @@ -240,6 +277,10 @@ int main(int argc, char** argv) { } else { const auto queryPlan = queryBuilder->getQueryPlan(FLAGS_run_query_verbose); const auto [cursor, actualResults] = benchmark.run(queryPlan); + if (!cursor) { + LOG(ERROR) << "Query terminated with error. Exiting"; + exit(1); + } auto task = cursor->task(); ensureTaskCompletion(task.get()); if (FLAGS_include_results) {