Skip to content

Commit

Permalink
fix: Fix the memory reclaim bytes for hash join (#11642)
Browse files Browse the repository at this point in the history
Summary:

Both hash join and probe does the coordinated spill so we shouldn't report the reclaimed bytes from a single node
but shall report from the plan node. Also probe side spill might spill built table from join side and the memory is
actually reclaimed from build side pool instead of probe side.

This PR also removes the unused wait for spill state from hash build

Differential Revision: D66437719
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Nov 26, 2024
1 parent 8f6d897 commit 978be89
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 34 deletions.
6 changes: 3 additions & 3 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1140,14 +1140,14 @@ std::string blockingReasonToString(BlockingReason reason) {
return "kWaitForMemory";
case BlockingReason::kWaitForConnector:
return "kWaitForConnector";
case BlockingReason::kWaitForSpill:
return "kWaitForSpill";
case BlockingReason::kYield:
return "kYield";
case BlockingReason::kWaitForArbitration:
return "kWaitForArbitration";
default:
VELOX_UNREACHABLE(
fmt::format("Unknown blocking reason {}", static_cast<int>(reason)));
}
VELOX_UNREACHABLE();
}

DriverThreadContext* driverThreadContext() {
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ enum class BlockingReason {
kWaitForConnector,
/// Build operator is blocked waiting for all its peers to stop to run group
/// spill on all of them.
///
/// TODO: remove this after Prestissimo is updated.
kWaitForSpill,
/// Some operators (like Table Scan) may run long loops and can 'voluntarily'
/// exit them because Task requested to yield or stop or after a certain time.
Expand Down
13 changes: 0 additions & 13 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ BlockingReason fromStateToBlockingReason(HashBuild::State state) {
return BlockingReason::kNotBlocked;
case HashBuild::State::kYield:
return BlockingReason::kYield;
case HashBuild::State::kWaitForSpill:
return BlockingReason::kWaitForSpill;
case HashBuild::State::kWaitForBuild:
return BlockingReason::kWaitForJoinBuild;
case HashBuild::State::kWaitForProbe:
Expand Down Expand Up @@ -944,13 +942,6 @@ BlockingReason HashBuild::isBlocked(ContinueFuture* future) {
break;
case State::kFinish:
break;
case State::kWaitForSpill:
if (!future_.valid()) {
setRunning();
VELOX_CHECK_NOT_NULL(input_);
addInput(std::move(input_));
}
break;
case State::kWaitForBuild:
[[fallthrough]];
case State::kWaitForProbe:
Expand Down Expand Up @@ -1003,8 +994,6 @@ void HashBuild::checkStateTransition(State state) {
break;
case State::kWaitForBuild:
[[fallthrough]];
case State::kWaitForSpill:
[[fallthrough]];
case State::kWaitForProbe:
[[fallthrough]];
case State::kFinish:
Expand All @@ -1022,8 +1011,6 @@ std::string HashBuild::stateName(State state) {
return "RUNNING";
case State::kYield:
return "YIELD";
case State::kWaitForSpill:
return "WAIT_FOR_SPILL";
case State::kWaitForBuild:
return "WAIT_FOR_BUILD";
case State::kWaitForProbe:
Expand Down
9 changes: 3 additions & 6 deletions velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,14 @@ class HashBuild final : public Operator {
/// The yield state that voluntarily yield cpu after running too long when
/// processing input from spilled file.
kYield = 2,
/// The state that waits for the pending group spill to finish. This state
/// only applies if disk spilling is enabled.
kWaitForSpill = 3,
/// The state that waits for the hash tables to be merged together.
kWaitForBuild = 4,
kWaitForBuild = 3,
/// The state that waits for the hash probe to finish before start to build
/// the hash table for one of previously spilled partition. This state only
/// applies if disk spilling is enabled.
kWaitForProbe = 5,
kWaitForProbe = 4,
/// The finishing state.
kFinish = 6,
kFinish = 5,
};
static std::string stateName(State state);

Expand Down
24 changes: 14 additions & 10 deletions velox/exec/HashJoinBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,19 +382,20 @@ uint64_t HashJoinMemoryReclaimer::reclaim(
uint64_t targetBytes,
uint64_t maxWaitMs,
memory::MemoryReclaimer::Stats& stats) {
const auto prevNodeReservedMemory = pool->reservedBytes();

// The flags to track if we have reclaimed from both build and probe operators
// under a hash join node.
bool hasReclaimedFromBuild{false};
bool hasReclaimedFromProbe{false};
uint64_t reclaimedBytes{0};
pool->visitChildren([&](memory::MemoryPool* child) {
VELOX_CHECK_EQ(child->kind(), memory::MemoryPool::Kind::kLeaf);
const bool isBuild = isHashBuildMemoryPool(*child);
if (isBuild) {
if (!hasReclaimedFromBuild) {
// We just need to reclaim from any one of the hash build operator.
hasReclaimedFromBuild = true;
reclaimedBytes += child->reclaim(targetBytes, maxWaitMs, stats);
child->reclaim(targetBytes, maxWaitMs, stats);
}
return !hasReclaimedFromProbe;
}
Expand All @@ -403,22 +404,25 @@ uint64_t HashJoinMemoryReclaimer::reclaim(
// The same as build operator, we only need to reclaim from any one of the
// hash probe operator.
hasReclaimedFromProbe = true;
reclaimedBytes += child->reclaim(targetBytes, maxWaitMs, stats);
child->reclaim(targetBytes, maxWaitMs, stats);
}
return !hasReclaimedFromBuild;
});
if (reclaimedBytes != 0) {
return reclaimedBytes;

auto currNodeReservedMemory = pool->reservedBytes();
VELOX_CHECK_LE(currNodeReservedMemory, prevNodeReservedMemory);
if (currNodeReservedMemory < prevNodeReservedMemory) {
return prevNodeReservedMemory - currNodeReservedMemory;
}

auto joinBridge = joinBridge_.lock();
if (joinBridge == nullptr) {
return reclaimedBytes;
return 0;
}
const auto oldNodeReservedMemory = pool->reservedBytes();
joinBridge->reclaim();
const auto newNodeReservedMemory = pool->reservedBytes();
VELOX_CHECK_LE(newNodeReservedMemory, oldNodeReservedMemory);
return oldNodeReservedMemory - newNodeReservedMemory;
currNodeReservedMemory = pool->reservedBytes();
VELOX_CHECK_LE(currNodeReservedMemory, prevNodeReservedMemory);
return prevNodeReservedMemory - currNodeReservedMemory;
}

bool isHashBuildMemoryPool(const memory::MemoryPool& pool) {
Expand Down
Loading

0 comments on commit 978be89

Please sign in to comment.