From 59fda5e4dda6301b97a2edae10a086d6767824f6 Mon Sep 17 00:00:00 2001 From: duanmeng Date: Mon, 18 Mar 2024 19:30:35 +0800 Subject: [PATCH] Add recursive spill for RowNumber --- velox/common/base/SpillConfig.h | 4 +- velox/exec/RowNumber.cpp | 146 ++++++++++----- velox/exec/RowNumber.h | 26 ++- velox/exec/tests/RowNumberTest.cpp | 290 ++++++++++++++++++++++++++--- 4 files changed, 384 insertions(+), 82 deletions(-) diff --git a/velox/common/base/SpillConfig.h b/velox/common/base/SpillConfig.h index ad5f4b3ab5b48..0ed379c5a219c 100644 --- a/velox/common/base/SpillConfig.h +++ b/velox/common/base/SpillConfig.h @@ -70,8 +70,8 @@ struct SpillConfig { /// the next level of recursive spilling. int32_t spillLevel(uint8_t startBitOffset) const; - /// Checks if the given 'startBitOffset' and 'numPartitionBits' has exceeded - /// the max hash join spill limit. + /// Checks if the given 'startBitOffset' has exceeded the max hash join spill + /// limit. bool exceedSpillLevelLimit(uint8_t startBitOffset) const; /// A callback function that returns the spill directory path. Implementations diff --git a/velox/exec/RowNumber.cpp b/velox/exec/RowNumber.cpp index 7e12b26c220cb..05403994bf735 100644 --- a/velox/exec/RowNumber.cpp +++ b/velox/exec/RowNumber.cpp @@ -65,52 +65,26 @@ RowNumber::RowNumber( resultProjections_.emplace_back(0, inputType->size()); results_.resize(1); } + + if (spillEnabled()) { + setSpillPartitionBits(); + } } void RowNumber::addInput(RowVectorPtr input) { - const auto numInput = input->size(); - if (table_) { ensureInputFits(input); - if (inputSpiller_ != nullptr) { spillInput(input, pool()); return; } - SelectivityVector rows(numInput); - table_->prepareForGroupProbe( - *lookup_, - input, - rows, - false, - BaseHashTable::kNoSpillInputStartPartitionBit); - table_->groupProbe(*lookup_); - - // Initialize new partitions with zeros. - for (auto i : lookup_->newGroups) { - setNumRows(lookup_->hits[i], 0); - } + addInputInternal(input); } input_ = std::move(input); } -void RowNumber::addSpillInput() { - const auto numInput = input_->size(); - SelectivityVector rows(numInput); - table_->prepareForGroupProbe( - *lookup_, input_, rows, false, spillConfig_->startPartitionBit); - table_->groupProbe(*lookup_); - - // Initialize new partitions with zeros. - for (auto i : lookup_->newGroups) { - setNumRows(lookup_->hits[i], 0); - } - - // TODO Add support for recursive spilling. -} - void RowNumber::noMoreInput() { Operator::noMoreInput(); @@ -135,6 +109,8 @@ void RowNumber::restoreNextSpillPartition() { if (hashTableIt != spillHashTablePartitionSet_.end()) { spillHashTableReader_ = hashTableIt->second->createUnorderedReader(pool()); + setSpillPartitionBits(&(it->first)); + RowVectorPtr data; while (spillHashTableReader_->nextBatch(data)) { // 'data' contains partition-by keys and count. Transform 'data' to match @@ -168,7 +144,7 @@ void RowNumber::restoreNextSpillPartition() { spillInputPartitionSet_.erase(it); spillInputReader_->nextBatch(input_); - addSpillInput(); + addInputInternal(input_, true); } void RowNumber::ensureInputFits(const RowVectorPtr& input) { @@ -251,7 +227,19 @@ FlatVector& RowNumber::getOrCreateRowNumberVector(vector_size_t size) { RowVectorPtr RowNumber::getOutput() { if (input_ == nullptr) { - return nullptr; + if (spillInputReader_ == nullptr) { + return nullptr; + } + + if (yield_) { + VELOX_CHECK_NULL(input_); + return nullptr; + } + + recursiveSpillInput(); + if (input_ == nullptr) { + return nullptr; + } } if (!table_) { @@ -307,7 +295,7 @@ RowVectorPtr RowNumber::getOutput() { if (spillInputReader_ != nullptr) { if (spillInputReader_->nextBatch(input_)) { - addSpillInput(); + addInputInternal(input_, true); } else { input_ = nullptr; spillInputReader_ = nullptr; @@ -368,8 +356,14 @@ void RowNumber::reclaim( return; } - if (inputSpiller_ != nullptr) { - // Already spilled. + if (exceededMaxSpillLevelLimit_) { + LOG(WARNING) << "Exceeded row spill level limit: " + << spillConfig_->maxSpillLevel + << ", and abandon spilling for memory pool: " + << pool()->name(); + common::SpillStats spillStats; + spillStats.spillMaxLevelExceededCount = 1; + Operator::recordSpillStats(spillStats); return; } @@ -380,8 +374,6 @@ SpillPartitionNumSet RowNumber::spillHashTable() { // TODO Replace joinPartitionBits and Spiller::Type::kHashJoinBuild. VELOX_CHECK_NOT_NULL(table_); - const auto& spillConfig = spillConfig_.value(); - auto columnTypes = table_->rows()->columnTypes(); auto tableType = ROW(std::move(columnTypes)); @@ -390,7 +382,7 @@ SpillPartitionNumSet RowNumber::spillHashTable() { table_->rows(), tableType, spillPartitionBits_, - &spillConfig); + &spillConfig_.value()); hashTableSpiller->spill(); hashTableSpiller->finishSpill(spillHashTablePartitionSet_); @@ -429,16 +421,11 @@ void RowNumber::setupInputSpiller( void RowNumber::spill() { VELOX_CHECK(spillEnabled()); - VELOX_CHECK_NULL(inputSpiller_); - - spillPartitionBits_ = HashBitRange( - spillConfig_->startPartitionBit, - spillConfig_->startPartitionBit + spillConfig_->numPartitionBits); const auto spillPartitionSet = spillHashTable(); + VELOX_CHECK_EQ(table_->numDistinct(), 0); setupInputSpiller(spillPartitionSet); - if (input_ != nullptr) { spillInput(input_, memory::spillMemoryPool()); input_ = nullptr; @@ -488,4 +475,73 @@ void RowNumber::spillInput( } } +BlockingReason RowNumber::isBlocked(ContinueFuture* /* unused */) { + const auto reason = + yield_ ? BlockingReason::kYield : BlockingReason::kNotBlocked; + yield_ = false; + return reason; +} + +void RowNumber::addInputInternal(const RowVectorPtr& input, bool fromSpill) { + if (fromSpill) { + ensureInputFits(input); + if (input == nullptr) { + return; + } + } + + const auto numInput = input->size(); + SelectivityVector rows(numInput); + + table_->prepareForGroupProbe( + *lookup_, + input, + rows, + false, + fromSpill ? spillConfig_->startPartitionBit + : BaseHashTable::kNoSpillInputStartPartitionBit); + table_->groupProbe(*lookup_); + + // Initialize new partitions with zeros. + for (auto i : lookup_->newGroups) { + setNumRows(lookup_->hits[i], 0); + } +} + +void RowNumber::recursiveSpillInput() { + RowVectorPtr input; + while (spillInputReader_->nextBatch(input)) { + spillInput(input, pool()); + + if (operatorCtx_->driver()->shouldYield()) { + yield_ = true; + return; + } + } + + inputSpiller_->finishSpill(spillInputPartitionSet_); + recordSpillStats(inputSpiller_->stats()); + spillInputReader_ = nullptr; + + removeEmptyPartitions(spillInputPartitionSet_); + restoreNextSpillPartition(); +} + +void RowNumber::setSpillPartitionBits( + const SpillPartitionId* restoredPartitionId) { + const auto startPartitionBitOffset = restoredPartitionId == nullptr + ? spillConfig_->startPartitionBit + : restoredPartitionId->partitionBitOffset() + + spillConfig_->numPartitionBits; + if (spillConfig_->exceedSpillLevelLimit(startPartitionBitOffset)) { + exceededMaxSpillLevelLimit_ = true; + return; + } + + exceededMaxSpillLevelLimit_ = false; + spillPartitionBits_ = HashBitRange( + startPartitionBitOffset, + startPartitionBitOffset + spillConfig_->numPartitionBits); +} + } // namespace facebook::velox::exec diff --git a/velox/exec/RowNumber.h b/velox/exec/RowNumber.h index 0038641e55a02..cae2724af5722 100644 --- a/velox/exec/RowNumber.h +++ b/velox/exec/RowNumber.h @@ -38,9 +38,7 @@ class RowNumber : public Operator { return !noMoreInput_ && !finishedEarly_; } - BlockingReason isBlocked(ContinueFuture* /* unused */) override { - return BlockingReason::kNotBlocked; - } + BlockingReason isBlocked(ContinueFuture* /* unused */) override; bool isFinished() override { return (noMoreInput_ && input_ == nullptr && @@ -64,7 +62,9 @@ class RowNumber : public Operator { void spill(); - void addSpillInput(); + // Probes the hash 'table_' with input. If 'fromSpill' is true, the input is + // read from the spilled input, otherwise from the source. + void addInputInternal(const RowVectorPtr& input, bool fromSpill = false); void restoreNextSpillPartition(); @@ -78,6 +78,18 @@ class RowNumber : public Operator { FlatVector& getOrCreateRowNumberVector(vector_size_t size); + // Used by recursive spill processing to read the spilled input data from the + // previous spill run through 'spillInputReader_' and then spill them back + // into a number of sub-partitions. After that, the function restores one of + // the newly spilled partitions and resets 'spillInputReader_' accordingly. + void recursiveSpillInput(); + + // Set 'spillPartitionBits_' used for (recursive) spill. + // If 'restoredPartitionId' is not nullptr, use it to set the + // 'spillPartitionBits_', otherwise use 'spillConfig_'. + void setSpillPartitionBits( + const SpillPartitionId* restoredPartitionId = nullptr); + const std::optional limit_; const bool generateRowNumber_; @@ -117,5 +129,11 @@ class RowNumber : public Operator { // Used to calculate the spill partition numbers of the inputs. std::unique_ptr spillHashFunction_; + + // The cpu may be voluntarily yield after running too long when processing + // input from spilled file. + bool yield_; + + bool exceededMaxSpillLevelLimit_{false}; }; } // namespace facebook::velox::exec diff --git a/velox/exec/tests/RowNumberTest.cpp b/velox/exec/tests/RowNumberTest.cpp index 6ec773d2c4774..07262e39fc4b6 100644 --- a/velox/exec/tests/RowNumberTest.cpp +++ b/velox/exec/tests/RowNumberTest.cpp @@ -216,7 +216,7 @@ TEST_F(RowNumberTest, spill) { for (const auto& testData : testSettings) { SCOPED_TRACE(testData.debugString()); - TestScopedSpillInjection scopedSpillInjection(100); + TestScopedSpillInjection scopedSpillInjection(100, 1); core::PlanNodeId rowNumberPlanNodeId; auto task = @@ -273,7 +273,7 @@ TEST_F(RowNumberTest, maxSpillBytes) { for (const auto& testData : testSettings) { SCOPED_TRACE(testData.debugString()); try { - TestScopedSpillInjection scopedSpillInjection(100); + TestScopedSpillInjection scopedSpillInjection(100, 1); AssertQueryBuilder(plan) .spillDirectory(spillDirectory->path) .queryCtx(queryCtx) @@ -306,39 +306,267 @@ TEST_F(RowNumberTest, memoryUsage) { .project({"c0", "c1"}) .planNode(); - int64_t peakBytesWithSpilling = 0; - int64_t peakBytesWithOutSpilling = 0; - - for (const auto& spillEnable : {false, true}) { - auto queryCtx = std::make_shared(executor_.get()); - auto spillDirectory = exec::test::TempDirectoryPath::create(); - const std::string spillEnableConfig = std::to_string(spillEnable); - - std::shared_ptr task; - TestScopedSpillInjection scopedSpillInjection(100); - AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) - .queryCtx(queryCtx) - .config(core::QueryConfig::kSpillEnabled, spillEnableConfig) - .config(core::QueryConfig::kRowNumberSpillEnabled, spillEnableConfig) - .spillDirectory(spillDirectory->path) - .copyResults(pool_.get(), task); - - if (spillEnable) { - peakBytesWithSpilling = queryCtx->pool()->peakBytes(); - auto taskStats = exec::toPlanStats(task->taskStats()); - const auto& stats = taskStats.at(rowNumberId); - - ASSERT_GT(stats.spilledBytes, 0); - ASSERT_GT(stats.spilledRows, 0); - ASSERT_GT(stats.spilledFiles, 0); - ASSERT_GT(stats.spilledPartitions, 0); + struct { + uint8_t numSpills; + + std::string debugString() const { + return fmt::format("numSpills {}", numSpills); + } + } testSettings[] = {{1}, {3}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + + int64_t peakBytesWithSpilling = 0; + int64_t peakBytesWithOutSpilling = 0; + + for (const auto& spillEnable : {false, true}) { + auto queryCtx = std::make_shared(executor_.get()); + auto spillDirectory = exec::test::TempDirectoryPath::create(); + const std::string spillEnableConfig = std::to_string(spillEnable); + + std::shared_ptr task; + TestScopedSpillInjection scopedSpillInjection(100, testData.numSpills); + AssertQueryBuilder(plan) + .spillDirectory(spillDirectory->path) + .queryCtx(queryCtx) + .config(core::QueryConfig::kSpillEnabled, spillEnableConfig) + .config(core::QueryConfig::kRowNumberSpillEnabled, spillEnableConfig) + .spillDirectory(spillDirectory->path) + .copyResults(pool_.get(), task); + + if (spillEnable) { + peakBytesWithSpilling = queryCtx->pool()->peakBytes(); + auto taskStats = exec::toPlanStats(task->taskStats()); + const auto& stats = taskStats.at(rowNumberId); + + ASSERT_GT(stats.spilledBytes, 0); + ASSERT_GT(stats.spilledRows, 0); + ASSERT_GT(stats.spilledFiles, 0); + ASSERT_GT(stats.spilledPartitions, 0); + } else { + peakBytesWithOutSpilling = queryCtx->pool()->peakBytes(); + } + } + + ASSERT_GE(peakBytesWithOutSpilling / peakBytesWithSpilling, 2); + } +} + +DEBUG_ONLY_TEST_F(RowNumberTest, spillOnlyDuringInputOrOutput) { + std::vector vectors = createVectors(8, rowType_, fuzzerOpts_); + createDuckDbTable(vectors); + const auto spillDirectory = exec::test::TempDirectoryPath::create(); + auto queryCtx = std::make_shared(executor_.get()); + + struct { + std::string spillInjectionPoint; + uint32_t spillPartitionBits; + + std::string debugString() const { + return fmt::format( + "Spill during {}, spillPartitionBits {}", + spillInjectionPoint, + spillPartitionBits); + } + } testSettings[] = { + {"facebook::velox::exec::Driver::runInternal::addInput", 2}, + {"facebook::velox::exec::Driver::runInternal::getOutput", 2}, + {"facebook::velox::exec::Driver::runInternal::addInput", 3}, + {"facebook::velox::exec::Driver::runInternal::getOutput", 3}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + std::atomic_int numRound{0}; + SCOPED_TESTVALUE_SET( + testData.spillInjectionPoint, + std::function(([&](Operator* op) { + if (op->operatorType() != "RowNumber") { + return; + } + + if (++numRound != 8) { + return; + } + + testingRunArbitration(op->pool(), 0); + }))); + + core::PlanNodeId rowNumberPlanNodeId; + auto task = + AssertQueryBuilder(duckDbQueryRunner_) + .spillDirectory(spillDirectory->path) + .config(core::QueryConfig::kSpillEnabled, true) + .config(core::QueryConfig::kRowNumberSpillEnabled, true) + .config( + core::QueryConfig::kSpillNumPartitionBits, + testData.spillPartitionBits) + .queryCtx(queryCtx) + .plan(PlanBuilder() + .values(vectors) + .rowNumber({"c0"}) + .capturePlanNodeId(rowNumberPlanNodeId) + .planNode()) + .assertResults( + "SELECT *, row_number() over (partition by c0) FROM tmp"); + auto taskStats = toPlanStats(task->taskStats()); + auto& planStats = taskStats.at(rowNumberPlanNodeId); + ASSERT_GT(planStats.spilledBytes, 0); + ASSERT_EQ( + planStats.spilledPartitions, + (static_cast(1) << testData.spillPartitionBits) * 2); + ASSERT_GT(planStats.spilledFiles, 0); + ASSERT_GT(planStats.spilledRows, 0); + + task.reset(); + waitForAllTasksToBeDeleted(); + } +} + +DEBUG_ONLY_TEST_F(RowNumberTest, recursiveSpill) { + std::vector vectors = createVectors(32, rowType_, fuzzerOpts_); + createDuckDbTable(vectors); + const auto spillDirectory = exec::test::TempDirectoryPath::create(); + auto queryCtx = std::make_shared(executor_.get()); + + struct { + int32_t numSpills; + int32_t maxSpillLevel; + + std::string debugString() const { + return fmt::format( + "numSpills {}, maxSpillLevel {}", numSpills, maxSpillLevel); + } + } testSettings[] = {{2, 3}, {4, 3}, {4, 4}, {8, 4}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + std::atomic_int numSpills{0}; + std::atomic_int numInputs{0}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Driver::runInternal::addInput", + std::function(([&](Operator* op) { + if (op->operatorType() != "RowNumber") { + return; + } + + if (++numInputs != 5) { + return; + } + + ++numSpills; + testingRunArbitration(op->pool(), 0); + }))); + + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Driver::runInternal::getOutput", + std::function(([&](Operator* op) { + if (op->operatorType() != "RowNumber") { + return; + } + + if (!op->testingNoMoreInput()) { + return; + } + + if (numSpills++ >= testData.numSpills) { + return; + } + + testingRunArbitration(op->pool(), 0); + }))); + + core::PlanNodeId rowNumberPlanNodeId; + auto task = + AssertQueryBuilder(duckDbQueryRunner_) + .spillDirectory(spillDirectory->path) + .config(core::QueryConfig::kSpillEnabled, true) + .config(core::QueryConfig::kRowNumberSpillEnabled, true) + .config( + core::QueryConfig::kMaxSpillLevel, testData.maxSpillLevel - 1) + .queryCtx(queryCtx) + .plan(PlanBuilder() + .values(vectors) + .rowNumber({"c0"}) + .capturePlanNodeId(rowNumberPlanNodeId) + .planNode()) + .assertResults( + "SELECT *, row_number() over (partition by c0) FROM tmp"); + auto taskStats = toPlanStats(task->taskStats()); + auto& planStats = taskStats.at(rowNumberPlanNodeId); + ASSERT_GT(planStats.spilledBytes, 0); + ASSERT_EQ( + planStats.spilledPartitions, + 8 * 2 * std::min(testData.numSpills, testData.maxSpillLevel)); + ASSERT_GT(planStats.spilledFiles, 0); + ASSERT_GT(planStats.spilledRows, 0); + + auto runTimeStats = + task->taskStats().pipelineStats.back().operatorStats.at(1).runtimeStats; + if (testData.numSpills > testData.maxSpillLevel) { + ASSERT_EQ( + runTimeStats["exceededMaxSpillLevel"].sum, + testData.numSpills - testData.maxSpillLevel); + ASSERT_EQ( + runTimeStats["exceededMaxSpillLevel"].count, + testData.numSpills - testData.maxSpillLevel); } else { - peakBytesWithOutSpilling = queryCtx->pool()->peakBytes(); + ASSERT_EQ(runTimeStats.count("exceededMaxSpillLevel"), 0); } + + task.reset(); + waitForAllTasksToBeDeleted(); } +} + +TEST_F(RowNumberTest, spillWithYield) { + std::vector vectors = createVectors(8, rowType_, fuzzerOpts_); + createDuckDbTable(vectors); + const auto spillDirectory = exec::test::TempDirectoryPath::create(); + auto queryCtx = std::make_shared(executor_.get()); + + struct { + uint32_t numSpills; + uint32_t cpuTimeSliceLimitMs; + + std::string debugString() const { + return fmt::format( + "numSpills {}, cpuTimeSliceLimitMs {}", + numSpills, + cpuTimeSliceLimitMs); + } + } testSettings[] = {{2, 0}, {2, 10}, {3, 0}, {3, 10}, {8, 10}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + TestScopedSpillInjection scopedSpillInjection(100, testData.numSpills); + + core::PlanNodeId rowNumberPlanNodeId; + auto task = + AssertQueryBuilder(duckDbQueryRunner_) + .spillDirectory(spillDirectory->path) + .config(core::QueryConfig::kSpillEnabled, true) + .config(core::QueryConfig::kRowNumberSpillEnabled, true) + .config( + core::QueryConfig::kDriverCpuTimeSliceLimitMs, + testData.cpuTimeSliceLimitMs) + .queryCtx(queryCtx) + .plan(PlanBuilder() + .values(vectors) + .rowNumber({"c0"}) + .capturePlanNodeId(rowNumberPlanNodeId) + .planNode()) + .assertResults( + "SELECT *, row_number() over (partition by c0) FROM tmp"); + auto taskStats = toPlanStats(task->taskStats()); + auto& planStats = taskStats.at(rowNumberPlanNodeId); + ASSERT_GT(planStats.spilledBytes, 0); + ASSERT_GT(planStats.spilledFiles, 0); + ASSERT_GT(planStats.spilledRows, 0); - ASSERT_GE(peakBytesWithOutSpilling / peakBytesWithSpilling, 2); + task.reset(); + waitForAllTasksToBeDeleted(); + } } } // namespace facebook::velox::exec::test