Skip to content

Commit

Permalink
Add cache and read-ahead to velox_tpch_benchmark (#3519)
Browse files Browse the repository at this point in the history
Summary:
Adds options for repetition and use of cache and read-ahead to velox_tpch_benchmark.

Pull Request resolved: #3519

Reviewed By: kgpai

Differential Revision: D42053362

Pulled By: oerling

fbshipit-source-id: 66c64729598e1607704012f6d73b95b5d5ae7a63
  • Loading branch information
Orri Erling authored and facebook-github-bot committed Dec 15, 2022
1 parent 1f0e35c commit 4c3ac73
Showing 1 changed file with 60 additions and 19 deletions.
79 changes: 60 additions & 19 deletions velox/benchmarks/tpch/TpchBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "velox/common/base/SuccinctPrinter.h"
#include "velox/common/file/FileSystems.h"
#include "velox/common/memory/MmapAllocator.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/dwio/common/Options.h"
#include "velox/dwio/dwrf/reader/DwrfReader.h"
Expand Down Expand Up @@ -90,13 +91,31 @@ DEFINE_bool(use_native_parquet_reader, true, "Use Native Parquet Reader");
DEFINE_int32(num_drivers, 4, "Number of drivers");
DEFINE_string(data_format, "parquet", "Data format");
DEFINE_int32(num_splits_per_file, 10, "Number of splits per file");
DEFINE_int32(
cache_gb,
0,
"GB of process memory for cache and query.. if "
"non-0, uses mmap to allocator and in-process data cache.");
DEFINE_int32(num_repeats, 1, "Number of times to run each query");

DEFINE_validator(data_path, &notEmpty);
DEFINE_validator(data_format, &validateDataFormat);

class TpchBenchmark {
public:
void initialize() {
if (FLAGS_cache_gb) {
int64_t memoryBytes = FLAGS_cache_gb * (1LL << 30);
memory::MmapAllocatorOptions options;
options.capacity = memoryBytes;
options.useMmapArena = true;
options.mmapArenaCapacityRatio = 1;

auto allocator = std::make_shared<memory::MmapAllocator>(options);
allocator_ = std::make_shared<cache::AsyncDataCache>(
allocator, memoryBytes, nullptr);
memory::MemoryAllocator::setDefaultInstance(allocator_.get());
}
functions::prestosql::registerAllScalarFunctions();
aggregate::prestosql::registerAllAggregateFunctions();
parse::registerTypeResolver();
Expand All @@ -107,38 +126,56 @@ class TpchBenchmark {
parquet::registerParquetReaderFactory(parquet::ParquetReaderType::DUCKDB);
}
dwrf::registerDwrfReaderFactory();
ioExecutor_ = std::make_unique<folly::IOThreadPoolExecutor>(8);

auto hiveConnector =
connector::getConnectorFactory(
connector::hive::HiveConnectorFactory::kHiveConnectorName)
->newConnector(kHiveConnectorId, nullptr);
->newConnector(kHiveConnectorId, nullptr, ioExecutor_.get());
connector::registerConnector(hiveConnector);
}

std::pair<std::unique_ptr<TaskCursor>, std::vector<RowVectorPtr>> run(
const TpchPlan& tpchPlan) {
CursorParameters params;
params.maxDrivers = FLAGS_num_drivers;
params.planNode = tpchPlan.plan;
const int numSplitsPerFile = FLAGS_num_splits_per_file;

bool noMoreSplits = false;
auto addSplits = [&](exec::Task* task) {
if (!noMoreSplits) {
for (const auto& entry : tpchPlan.dataFiles) {
for (const auto& path : entry.second) {
auto const splits = HiveConnectorTestBase::makeHiveConnectorSplits(
path, numSplitsPerFile, tpchPlan.dataFileFormat);
for (const auto& split : splits) {
task->addSplit(entry.first, exec::Split(split));
int32_t repeat = 0;
try {
for (;;) {
CursorParameters params;
params.maxDrivers = FLAGS_num_drivers;
params.planNode = tpchPlan.plan;
const int numSplitsPerFile = FLAGS_num_splits_per_file;

bool noMoreSplits = false;
auto addSplits = [&](exec::Task* task) {
if (!noMoreSplits) {
for (const auto& entry : tpchPlan.dataFiles) {
for (const auto& path : entry.second) {
auto const splits =
HiveConnectorTestBase::makeHiveConnectorSplits(
path, numSplitsPerFile, tpchPlan.dataFileFormat);
for (const auto& split : splits) {
task->addSplit(entry.first, exec::Split(split));
}
}
task->noMoreSplits(entry.first);
}
}
task->noMoreSplits(entry.first);
noMoreSplits = true;
};
auto result = readCursor(params, addSplits);
ensureTaskCompletion(result.first->task().get());
if (++repeat >= FLAGS_num_repeats) {
return result;
}
}
noMoreSplits = true;
};
return readCursor(params, addSplits);
} catch (const std::exception& e) {
LOG(ERROR) << "Query terminated with: " << e.what();
return {nullptr, {}};
}
}

std::unique_ptr<folly::IOThreadPoolExecutor> ioExecutor_;
std::shared_ptr<memory::MemoryAllocator> allocator_;
};

TpchBenchmark benchmark;
Expand Down Expand Up @@ -240,6 +277,10 @@ int main(int argc, char** argv) {
} else {
const auto queryPlan = queryBuilder->getQueryPlan(FLAGS_run_query_verbose);
const auto [cursor, actualResults] = benchmark.run(queryPlan);
if (!cursor) {
LOG(ERROR) << "Query terminated with error. Exiting";
exit(1);
}
auto task = cursor->task();
ensureTaskCompletion(task.get());
if (FLAGS_include_results) {
Expand Down

0 comments on commit 4c3ac73

Please sign in to comment.