Skip to content

Commit

Permalink
Add recursive spill for RowNumber
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Mar 30, 2024
1 parent 0c86a0a commit 9106413
Show file tree
Hide file tree
Showing 5 changed files with 367 additions and 61 deletions.
4 changes: 2 additions & 2 deletions velox/common/base/SpillConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ struct SpillConfig {
/// the next level of recursive spilling.
int32_t spillLevel(uint8_t startBitOffset) const;

/// Checks if the given 'startBitOffset' and 'numPartitionBits' has exceeded
/// the max hash join spill limit.
/// Checks if the given 'startBitOffset' has exceeded the max hash join spill
/// limit.
bool exceedSpillLevelLimit(uint8_t startBitOffset) const;

/// A callback function that returns the spill directory path. Implementations
Expand Down
2 changes: 1 addition & 1 deletion velox/docs/develop/operators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ next batch of input.

This operator accumulates state: a hash table mapping partition keys to total
number of rows seen in this partition so far. Returning the row numbers as
a column in the output is optional. This operator doesn't support spilling yet.
a column in the output is optional. This operator supports spilling.

This operator is equivalent to a WindowNode followed by
FilterNode(row_number <= limit), but it uses less memory and CPU and makes
Expand Down
113 changes: 87 additions & 26 deletions velox/exec/RowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ RowNumber::RowNumber(
resultProjections_.emplace_back(0, inputType->size());
results_.resize(1);
}

if (spillEnabled()) {
setSpillPartitionBits();
}
}

void RowNumber::addInput(RowVectorPtr input) {
Expand Down Expand Up @@ -96,21 +100,6 @@ void RowNumber::addInput(RowVectorPtr input) {
input_ = std::move(input);
}

void RowNumber::addSpillInput() {
const auto numInput = input_->size();
SelectivityVector rows(numInput);
table_->prepareForGroupProbe(
*lookup_, input_, rows, false, spillConfig_->startPartitionBit);
table_->groupProbe(*lookup_);

// Initialize new partitions with zeros.
for (auto i : lookup_->newGroups) {
setNumRows(lookup_->hits[i], 0);
}

// TODO Add support for recursive spilling.
}

void RowNumber::noMoreInput() {
Operator::noMoreInput();

Expand All @@ -134,6 +123,8 @@ void RowNumber::restoreNextSpillPartition() {
if (hashTableIt != spillHashTablePartitionSet_.end()) {
spillHashTableReader_ = hashTableIt->second->createUnorderedReader(pool());

setSpillPartitionBits(&(it->first));

RowVectorPtr data;
while (spillHashTableReader_->nextBatch(data)) {
// 'data' contains partition-by keys and count. Transform 'data' to match
Expand Down Expand Up @@ -250,7 +241,18 @@ FlatVector<int64_t>& RowNumber::getOrCreateRowNumberVector(vector_size_t size) {

RowVectorPtr RowNumber::getOutput() {
if (input_ == nullptr) {
return nullptr;
if (spillInputReader_ == nullptr) {
return nullptr;
}

if (yield_) {
return nullptr;
}

recursiveSpillInput();
if (input_ == nullptr) {
return nullptr;
}
}

if (!table_) {
Expand Down Expand Up @@ -367,8 +369,12 @@ void RowNumber::reclaim(
return;
}

if (inputSpiller_ != nullptr) {
// Already spilled.
if (exceededMaxSpillLevelLimit_) {
LOG(WARNING) << "Exceeded row spill level limit: "
<< spillConfig_->maxSpillLevel
<< ", and abandon spilling for memory pool: "
<< pool()->name();
++spillStats_.wlock()->spillMaxLevelExceededCount;
return;
}

Expand All @@ -379,10 +385,9 @@ SpillPartitionNumSet RowNumber::spillHashTable() {
// TODO Replace joinPartitionBits and Spiller::Type::kHashJoinBuild.
VELOX_CHECK_NOT_NULL(table_);

const auto& spillConfig = spillConfig_.value();

auto columnTypes = table_->rows()->columnTypes();
auto tableType = ROW(std::move(columnTypes));
const auto& spillConfig = spillConfig_.value();

auto hashTableSpiller = std::make_unique<Spiller>(
Spiller::Type::kRowNumber,
Expand Down Expand Up @@ -429,16 +434,11 @@ void RowNumber::setupInputSpiller(

void RowNumber::spill() {
VELOX_CHECK(spillEnabled());
VELOX_CHECK_NULL(inputSpiller_);

spillPartitionBits_ = HashBitRange(
spillConfig_->startPartitionBit,
spillConfig_->startPartitionBit + spillConfig_->numPartitionBits);

const auto spillPartitionSet = spillHashTable();
VELOX_CHECK_EQ(table_->numDistinct(), 0);

setupInputSpiller(spillPartitionSet);

if (input_ != nullptr) {
spillInput(input_, memory::spillMemoryPool());
input_ = nullptr;
Expand Down Expand Up @@ -488,4 +488,65 @@ void RowNumber::spillInput(
}
}

BlockingReason RowNumber::isBlocked(ContinueFuture* /* unused */) {
const auto reason =
yield_ ? BlockingReason::kYield : BlockingReason::kNotBlocked;
yield_ = false;
return reason;
}

void RowNumber::addSpillInput() {
ensureInputFits(input_);
if (input_ == nullptr) {
return;
}

const auto numInput = input_->size();
SelectivityVector rows(numInput);

table_->prepareForGroupProbe(
*lookup_, input_, rows, false, spillConfig_->startPartitionBit);
table_->groupProbe(*lookup_);

// Initialize new partitions with zeros.
for (auto i : lookup_->newGroups) {
setNumRows(lookup_->hits[i], 0);
}
}

void RowNumber::recursiveSpillInput() {
RowVectorPtr input;
while (spillInputReader_->nextBatch(input)) {
spillInput(input, pool());

if (operatorCtx_->driver()->shouldYield()) {
yield_ = true;
return;
}
}

inputSpiller_->finishSpill(spillInputPartitionSet_);
spillInputReader_ = nullptr;

removeEmptyPartitions(spillInputPartitionSet_);
restoreNextSpillPartition();
}

void RowNumber::setSpillPartitionBits(
const SpillPartitionId* restoredPartitionId) {
const auto startPartitionBitOffset = restoredPartitionId == nullptr
? spillConfig_->startPartitionBit
: restoredPartitionId->partitionBitOffset() +
spillConfig_->numPartitionBits;
if (spillConfig_->exceedSpillLevelLimit(startPartitionBitOffset)) {
exceededMaxSpillLevelLimit_ = true;
return;
}

exceededMaxSpillLevelLimit_ = false;
spillPartitionBits_ = HashBitRange(
startPartitionBitOffset,
startPartitionBitOffset + spillConfig_->numPartitionBits);
}

} // namespace facebook::velox::exec
26 changes: 23 additions & 3 deletions velox/exec/RowNumber.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ class RowNumber : public Operator {
return !noMoreInput_ && !finishedEarly_;
}

BlockingReason isBlocked(ContinueFuture* /* unused */) override {
return BlockingReason::kNotBlocked;
}
BlockingReason isBlocked(ContinueFuture* /* unused */) override;

bool isFinished() override {
return (noMoreInput_ && input_ == nullptr &&
Expand Down Expand Up @@ -78,6 +76,22 @@ class RowNumber : public Operator {

FlatVector<int64_t>& getOrCreateRowNumberVector(vector_size_t size);

// Used by recursive spill processing to read the spilled input data from the
// previous spill run through 'spillInputReader_' and then spill them back
// into a number of sub-partitions. After that, the function restores one of
// the newly spilled partitions and resets 'spillInputReader_' accordingly.
void recursiveSpillInput();

// Set 'spillPartitionBits_' for (recursive) spill. If 'restoredPartitionId'
// is not null, use it to set 'spillPartitionBits_', otherwise use
// 'spillConfig_'. If the new 'spillPartitionBits_' exceeds the
// 'maxSpillLevel', set 'exceededMaxSpillLevelLimit_' to true.
// NOTE: we don't increment 'spillMaxLevelExceededCount' here, as the actual
// increment happens in the 'reclaim()' method if
// 'exceededMaxSpillLevelLimit_' is true.
void setSpillPartitionBits(
const SpillPartitionId* restoredPartitionId = nullptr);

const std::optional<int32_t> limit_;
const bool generateRowNumber_;

Expand Down Expand Up @@ -117,5 +131,11 @@ class RowNumber : public Operator {

// Used to calculate the spill partition numbers of the inputs.
std::unique_ptr<HashPartitionFunction> spillHashFunction_;

// The cpu may be voluntarily yield after running too long when processing
// input from spilled file.
bool yield_;

bool exceededMaxSpillLevelLimit_{false};
};
} // namespace facebook::velox::exec
Loading

0 comments on commit 9106413

Please sign in to comment.