Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Mar 23, 2024
1 parent 1d95a47 commit 8f801ac
Show file tree
Hide file tree
Showing 34 changed files with 192 additions and 185 deletions.
11 changes: 6 additions & 5 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -510,8 +510,9 @@ DataSink::Stats HiveDataSink::stats() const {
for (int i = 0; i < writerInfo_.size(); ++i) {
const auto& info = writerInfo_.at(i);
VELOX_CHECK_NOT_NULL(info);
if (!info->spillStats->empty()) {
stats.spillStats += *info->spillStats;
const auto spillStats = info->spillStats->rlock();
if (!spillStats->empty()) {
stats.spillStats += *spillStats;
}
}
return stats;
Expand Down Expand Up @@ -719,15 +720,15 @@ HiveDataSink::maybeCreateBucketSortWriter(
sortCompareFlags_,
sortPool,
writerInfo_.back()->nonReclaimableSectionHolder.get(),
spillConfig_);
spillConfig_,
writerInfo_.back()->spillStats.get());
return std::make_unique<dwio::common::SortingWriter>(
std::move(writer),
std::move(sortBuffer),
hiveConfig_->sortWriterMaxOutputRows(
connectorQueryCtx_->sessionProperties()),
hiveConfig_->sortWriterMaxOutputBytes(
connectorQueryCtx_->sessionProperties()),
writerInfo_.back()->spillStats.get());
connectorQueryCtx_->sessionProperties()));
}

void HiveDataSink::splitInputRowsAndEnsureWriters() {
Expand Down
4 changes: 2 additions & 2 deletions velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ struct HiveWriterInfo {
std::shared_ptr<memory::MemoryPool> _sortPool)
: writerParameters(std::move(parameters)),
nonReclaimableSectionHolder(new tsan_atomic<bool>(false)),
spillStats(new common::SpillStats()),
spillStats(std::make_unique<folly::Synchronized<common::SpillStats>>()),
writerPool(std::move(_writerPool)),
sinkPool(std::move(_sinkPool)),
sortPool(std::move(_sortPool)) {}
Expand All @@ -364,7 +364,7 @@ struct HiveWriterInfo {
const std::unique_ptr<tsan_atomic<bool>> nonReclaimableSectionHolder;
/// Collects the spill stats from sort writer if the spilling has been
/// triggered.
const std::unique_ptr<common::SpillStats> spillStats;
const std::unique_ptr<folly::Synchronized<common::SpillStats>> spillStats;
const std::shared_ptr<memory::MemoryPool> writerPool;
const std::shared_ptr<memory::MemoryPool> sinkPool;
const std::shared_ptr<memory::MemoryPool> sortPool;
Expand Down
12 changes: 3 additions & 9 deletions velox/dwio/common/SortingWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,20 @@ SortingWriter::SortingWriter(
std::unique_ptr<Writer> writer,
std::unique_ptr<exec::SortBuffer> sortBuffer,
uint32_t maxOutputRowsConfig,
uint64_t maxOutputBytesConfig,
velox::common::SpillStats* spillStats)
uint64_t maxOutputBytesConfig)
: outputWriter_(std::move(writer)),
maxOutputRowsConfig_(maxOutputRowsConfig),
maxOutputBytesConfig_(maxOutputBytesConfig),
sortPool_(sortBuffer->pool()),
canReclaim_(sortBuffer->canSpill()),
spillStats_(spillStats),
sortBuffer_(std::move(sortBuffer)) {
VELOX_CHECK_GT(maxOutputRowsConfig_, 0);
VELOX_CHECK_GT(maxOutputBytesConfig_, 0);
VELOX_CHECK_NOT_NULL(spillStats_);
if (sortPool_->parent()->reclaimer() != nullptr) {
sortPool_->setReclaimer(MemoryReclaimer::create(this));
}
setState(State::kRunning);
// TODO: set SortBuffer::spillStats_.
}

SortingWriter::~SortingWriter() {
Expand All @@ -64,11 +62,7 @@ void SortingWriter::close() {
outputWriter_->write(output);
output = sortBuffer_->getOutput(maxOutputBatchRows);
}
auto spillStatsOr = sortBuffer_->spilledStats();
if (spillStatsOr.has_value()) {
VELOX_CHECK(canReclaim_);
*spillStats_ = spillStatsOr.value();
}

sortBuffer_.reset();
sortPool_->release();
outputWriter_->close();
Expand Down
4 changes: 1 addition & 3 deletions velox/dwio/common/SortingWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ class SortingWriter : public Writer {
std::unique_ptr<Writer> writer,
std::unique_ptr<exec::SortBuffer> sortBuffer,
uint32_t maxOutputRowsConfig,
uint64_t maxOutputBytesConfig,
velox::common::SpillStats* spillStats);
uint64_t maxOutputBytesConfig);

~SortingWriter() override;

Expand Down Expand Up @@ -81,7 +80,6 @@ class SortingWriter : public Writer {
const uint64_t maxOutputBytesConfig_;
memory::MemoryPool* const sortPool_;
const bool canReclaim_;
velox::common::SpillStats* const spillStats_;

std::unique_ptr<exec::SortBuffer> sortBuffer_;
};
Expand Down
18 changes: 13 additions & 5 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ GroupingSet::GroupingSet(
const std::optional<column_index_t>& groupIdChannel,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection,
OperatorCtx* operatorCtx)
OperatorCtx* operatorCtx,
folly::Synchronized<common::SpillStats>* spillStats)
: preGroupedKeyChannels_(std::move(preGroupedKeys)),
hashers_(std::move(hashers)),
isGlobal_(hashers_.empty()),
Expand All @@ -69,7 +70,8 @@ GroupingSet::GroupingSet(
stringAllocator_(operatorCtx->pool()),
rows_(operatorCtx->pool()),
isAdaptive_(queryConfig_.hashAdaptivityEnabled()),
pool_(*operatorCtx->pool()) {
pool_(*operatorCtx->pool()),
spillStats_(spillStats) {
VELOX_CHECK_NOT_NULL(nonReclaimableSection_);
VELOX_CHECK(pool_.trackUsage());
for (auto& hasher : hashers_) {
Expand Down Expand Up @@ -131,7 +133,8 @@ std::unique_ptr<GroupingSet> GroupingSet::createForMarkDistinct(
/*groupIdColumn*/ std::nullopt,
/*spillConfig*/ nullptr,
nonReclaimableSection,
operatorCtx);
operatorCtx,
/*spillStats_*/ nullptr);
};

namespace {
Expand Down Expand Up @@ -939,7 +942,8 @@ void GroupingSet::spill() {
makeSpillType(),
rows->keyTypes().size(),
std::vector<CompareFlags>(),
spillConfig_);
spillConfig_,
spillStats_);
}
spiller_->spill();
if (sortedAggregations_) {
Expand All @@ -958,7 +962,11 @@ void GroupingSet::spill(const RowContainerIterator& rowIterator) {
auto* rows = table_->rows();
VELOX_CHECK(pool_.trackUsage());
spiller_ = std::make_unique<Spiller>(
Spiller::Type::kAggregateOutput, rows, makeSpillType(), spillConfig_);
Spiller::Type::kAggregateOutput,
rows,
makeSpillType(),
spillConfig_,
spillStats_);

spiller_->spill(rowIterator);
table_->clear();
Expand Down
5 changes: 4 additions & 1 deletion velox/exec/GroupingSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class GroupingSet {
const std::optional<column_index_t>& groupIdChannel,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection,
OperatorCtx* operatorCtx);
OperatorCtx* operatorCtx,
folly::Synchronized<common::SpillStats>* spillStats);

~GroupingSet();

Expand Down Expand Up @@ -359,6 +360,8 @@ class GroupingSet {
// Temporary for case where an aggregate in toIntermediate() outputs post-init
// state of aggregate for all rows.
std::vector<char*> firstGroup_;

folly::Synchronized<common::SpillStats>* const spillStats_;
};

} // namespace facebook::velox::exec
14 changes: 2 additions & 12 deletions velox/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ void HashAggregation::initialize() {
groupIdChannel,
spillConfig_.has_value() ? &spillConfig_.value() : nullptr,
&nonReclaimableSection_,
operatorCtx_.get());
operatorCtx_.get(),
&spillStats_);

aggregationNode_.reset();
}
Expand Down Expand Up @@ -188,13 +189,6 @@ void HashAggregation::updateRuntimeStats() {
RuntimeMetric(hashTableStats.numTombstones);
}

void HashAggregation::recordSpillStats() {
auto spillStatsOr = groupingSet_->spilledStats();
if (spillStatsOr.has_value()) {
Operator::recordSpillStats(spillStatsOr.value());
}
}

void HashAggregation::prepareOutput(vector_size_t size) {
if (output_) {
VectorPtr output = std::move(output_);
Expand Down Expand Up @@ -388,7 +382,6 @@ void HashAggregation::noMoreInput() {
updateEstimatedOutputRowSize();
groupingSet_->noMoreInput();
Operator::noMoreInput();
recordSpillStats();
// Release the extra reserved memory right after processing all the inputs.
pool()->release();
}
Expand Down Expand Up @@ -429,9 +422,6 @@ void HashAggregation::reclaim(
// Spill all the rows starting from the next output row pointed by
// 'resultIterator_'.
groupingSet_->spill(resultIterator_);
// NOTE: we will only spill once during the output processing stage so
// record stats here.
recordSpillStats();
} else {
// TODO: support fine-grain disk spilling based on 'targetBytes' after
// having row container memory compaction support later.
Expand Down
4 changes: 0 additions & 4 deletions velox/exec/HashAggregation.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ class HashAggregation : public Operator {

RowVectorPtr getDistinctOutput();

// Invoked to record the spilling stats in operator stats after processing all
// the inputs.
void recordSpillStats();

void updateEstimatedOutputRowSize();

std::shared_ptr<const core::AggregationNode> aggregationNode_;
Expand Down
25 changes: 7 additions & 18 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) {
<< spillConfig.maxSpillLevel
<< ", and disable spilling for memory pool: "
<< pool()->name();
++spillStats_.wlock()->spillMaxLevelExceededCount;
exceededMaxSpillLevelLimit_ = true;
return;
}
Expand All @@ -243,7 +244,8 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) {
table_->rows(),
spillType_,
std::move(hashBits),
&spillConfig);
&spillConfig,
&spillStats_);

const int32_t numPartitions = spiller_->hashBits().numPartitions();
spillInputIndicesBuffers_.resize(numPartitions);
Expand Down Expand Up @@ -732,15 +734,15 @@ bool HashBuild::finishHashBuild() {
}
if (spiller != nullptr) {
spiller->finishSpill(spillPartitions);
build->recordSpillStats(spiller.get());
build->restoreSpillLimit();
}
}

if (spiller_ != nullptr) {
spiller_->finishSpill(spillPartitions);
removeEmptyPartitions(spillPartitions);
}
recordSpillStats();
restoreSpillLimit();

// TODO: re-enable parallel join build with spilling triggered after
// https://github.com/facebookincubator/velox/issues/3567 is fixed.
Expand All @@ -765,21 +767,8 @@ bool HashBuild::finishHashBuild() {
return true;
}

void HashBuild::recordSpillStats() {
recordSpillStats(spiller_.get());
}

void HashBuild::recordSpillStats(Spiller* spiller) {
if (spiller != nullptr) {
const auto spillStats = spiller->stats();
VELOX_CHECK_EQ(spillStats.spillSortTimeUs, 0);
Operator::recordSpillStats(spillStats);
} else if (exceededMaxSpillLevelLimit_) {
exceededMaxSpillLevelLimit_ = false;
common::SpillStats spillStats;
spillStats.spillMaxLevelExceededCount = 1;
Operator::recordSpillStats(spillStats);
}
void HashBuild::restoreSpillLimit() {
exceededMaxSpillLevelLimit_ = false;
}

void HashBuild::ensureTableFits(uint64_t numRows) {
Expand Down
3 changes: 1 addition & 2 deletions velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ class HashBuild final : public Operator {
return canReclaim();
}

void recordSpillStats();
void recordSpillStats(Spiller* spiller);
void restoreSpillLimit();

// Indicates if the input is read from spill data or not.
bool isInputFromSpill() const;
Expand Down
11 changes: 5 additions & 6 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ void HashProbe::maybeSetupSpillInput(
spillInputPartitionIds_.begin()->partitionBitOffset(),
spillInputPartitionIds_.begin()->partitionBitOffset() +
spillConfig.numPartitionBits),
&spillConfig);
&spillConfig,
&spillStats_);
// Set the spill partitions to the corresponding ones at the build side. The
// hash probe operator itself won't trigger any spilling.
spiller_->setPartitionsSpilled(toPartitionNumSet(spillInputPartitionIds_));
Expand Down Expand Up @@ -1413,11 +1414,9 @@ void HashProbe::noMoreInputInternal() {
}

void HashProbe::recordSpillStats() {
VELOX_CHECK_NOT_NULL(spiller_);
const auto spillStats = spiller_->stats();
VELOX_CHECK_EQ(spillStats.spillSortTimeUs, 0);
VELOX_CHECK_EQ(spillStats.spillFillTimeUs, 0);
Operator::recordSpillStats(spillStats);
VELOX_CHECK_EQ(spillStats_.rlock()->spillSortTimeUs, 0);
VELOX_CHECK_EQ(spillStats_.rlock()->spillFillTimeUs, 0);
Operator::recordSpillStats();
}

bool HashProbe::isFinished() {
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/HashProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ class HashProbe : public Operator {
// next hash table from the spilled data.
void noMoreInputInternal();

void recordSpillStats();
void recordSpillStats() override;

// Returns the index of the 'match' column in the output for semi project
// joins.
Expand Down
Loading

0 comments on commit 8f801ac

Please sign in to comment.