Skip to content

Commit

Permalink
Parallelize local memory arbitration (facebookincubator#9649)
Browse files Browse the repository at this point in the history
Summary:
Parallelize the local memory arbitration execution. The workload flow of memory arbitration process
changes as follows:
First wait in the arbitration queue to serialize the memory arbitration request processing from the same
query pool. Adds ArbitrationQueue data structure for this which contains the wait promises of the
arbitration requests from the same query pool. The arbitration queue is protected by arbitrator lock.

Second calls runLocalArbitration to run local arbitration which acquires the reader lock of arbitration
lock (which is added by this pr for local/global arbitration execution control). Then it ensures the request
memory pool is within the capacity limit (this might trigger spill to reclaim the used memory from the
request pool itself), and tries to allocate free capacity from the arbitrator or reclaim the free memory from
the other queries which is protected by arbitrator lock.

Third if runLocalArbitration can't reclaim sufficient memory, then proceeds with runGlobalArbitration which
acquires the writer lock of arbitration lock. Then it reclaims the free capacity from the arbitrator or the other
queries. And at last reclaims the used memory from the other queries by spilling.

ArbitrationOperation is added to contains the state of arbitration request processing to simplify the code
implementation.

This PR also adds an option to disable global arbitration and only do local arbitration which can always
run in parallel. With this option, for 2hrs stress test (1000 query concurrency at coordinator) with Meta
internal production workloads replay, the averaged query execution time has been reduced by ~30% and cpu
time is kept the same. The memory arbitration wall time for query without triggering spilled is only 9 mins in
total.

Pull Request resolved: facebookincubator#9649

Reviewed By: tanjialiang, oerling

Differential Revision: D56685200

Pulled By: xiaoxmeng

fbshipit-source-id: f8db71e4cae05f24f913464c37c5bc1e6528083b
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed May 7, 2024
1 parent ec4d2ec commit 5911129
Show file tree
Hide file tree
Showing 16 changed files with 1,401 additions and 423 deletions.
15 changes: 11 additions & 4 deletions velox/common/base/Counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,18 @@ void registerVeloxMetrics() {
kMetricArbitratorGlobalArbitrationCount,
facebook::velox::StatType::COUNT);

// The distribution of the amount of time an arbitration request stays queued
// in range of [0, 600s] with 20 buckets. It is configured to report the
// latency at P50, P90, P99, and P100 percentiles.
// The number of global arbitration that reclaims used memory by slow disk
// spilling.
DEFINE_METRIC(
kMetricArbitratorSlowGlobalArbitrationCount,
facebook::velox::StatType::COUNT);

// The distribution of the amount of time an arbitration operation stays in
// arbitration queues and waits the arbitration r/w locks in range of [0,
// 600s] with 20 buckets. It is configured to report the latency at P50, P90,
// P99, and P100 percentiles.
DEFINE_HISTOGRAM_METRIC(
kMetricArbitratorQueueTimeMs, 30'000, 0, 600'000, 50, 90, 99, 100);
kMetricArbitratorWaitTimeMs, 30'000, 0, 600'000, 50, 90, 99, 100);

// The distribution of the amount of time it take to complete a single
// arbitration request stays queued in range of [0, 600s] with 20
Expand Down
19 changes: 11 additions & 8 deletions velox/common/base/Counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,21 @@ constexpr folly::StringPiece kMetricArbitratorLocalArbitrationCount{
constexpr folly::StringPiece kMetricArbitratorGlobalArbitrationCount{
"velox.arbitrator_global_arbitration_count"};

constexpr folly::StringPiece kMetricArbitratorQueueTimeMs{
"velox.arbitrator_queue_time_ms"};
constexpr folly::StringPiece kMetricArbitratorSlowGlobalArbitrationCount{
"velox.arbitrator_slow_global_arbitration_count"};

constexpr folly::StringPiece kMetricArbitratorAbortedCount{
"velox.arbitrator_aborted_count"};

constexpr folly::StringPiece kMetricArbitratorFailuresCount{
"velox.arbitrator_failures_count"};

constexpr folly::StringPiece kMetricArbitratorArbitrationTimeMs{
"velox.arbitrator_arbitration_time_ms"};

constexpr folly::StringPiece kMetricArbitratorWaitTimeMs{
"velox.arbitrator_wait_time_ms"};

constexpr folly::StringPiece kMetricArbitratorFreeCapacityBytes{
"velox.arbitrator_free_capacity_bytes"};

Expand Down Expand Up @@ -122,10 +131,4 @@ constexpr folly::StringPiece kMetricFileWriterEarlyFlushedRawBytes{

constexpr folly::StringPiece kMetricArbitratorRequestsCount{
"velox.arbitrator_requests_count"};

constexpr folly::StringPiece kMetricArbitratorAbortedCount{
"velox.arbitrator_aborted_count"};

constexpr folly::StringPiece kMetricArbitratorFailuresCount{
"velox.arbitrator_failures_count"};
} // namespace facebook::velox
1 change: 1 addition & 0 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ std::unique_ptr<MemoryArbitrator> createArbitrator(
.memoryPoolReservedCapacity = options.memoryPoolReservedCapacity,
.memoryPoolTransferCapacity = options.memoryPoolTransferCapacity,
.memoryReclaimWaitMs = options.memoryReclaimWaitMs,
.globalArbitrationEnabled = options.globalArbitrationEnabled,
.arbitrationStateCheckCb = options.arbitrationStateCheckCb,
.checkUsageLeak = options.checkUsageLeak});
}
Expand Down
4 changes: 4 additions & 0 deletions velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ struct MemoryManagerOptions {
/// zero, then there is no timeout. The default is 5 mins.
uint64_t memoryReclaimWaitMs{300'000};

/// If true, it allows memory arbitrator to reclaim used memory cross query
/// memory pools.
bool globalArbitrationEnabled{false};

/// Provided by the query system to validate the state after a memory pool
/// enters arbitration if not null. For instance, Prestissimo provides
/// callback to check if a memory arbitration request is issued from a driver
Expand Down
3 changes: 1 addition & 2 deletions velox/common/memory/MemoryArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,12 +330,11 @@ MemoryArbitrator::Stats::Stats(

std::string MemoryArbitrator::Stats::toString() const {
return fmt::format(
"STATS[numRequests {} numSucceeded {} numAborted {} numFailures {} "
"STATS[numRequests {} numAborted {} numFailures {} "
"numNonReclaimableAttempts {} numReserves {} numReleases {} "
"queueTime {} arbitrationTime {} reclaimTime {} shrunkMemory {} "
"reclaimedMemory {} maxCapacity {} freeCapacity {} freeReservedCapacity {}]",
numRequests,
numSucceeded,
numAborted,
numFailures,
numNonReclaimableAttempts,
Expand Down
6 changes: 6 additions & 0 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ class MemoryArbitrator {
/// timeout.
uint64_t memoryReclaimWaitMs{0};

/// If true, it allows memory arbitrator to reclaim used memory cross query
/// memory pools.
bool globalArbitrationEnabled{false};

/// Provided by the query system to validate the state after a memory pool
/// enters arbitration if not null. For instance, Prestissimo provides
/// callback to check if a memory arbitration request is issued from a
Expand Down Expand Up @@ -253,6 +257,7 @@ class MemoryArbitrator {
memoryPoolReservedCapacity_(config.memoryPoolReservedCapacity),
memoryPoolTransferCapacity_(config.memoryPoolTransferCapacity),
memoryReclaimWaitMs_(config.memoryReclaimWaitMs),
globalArbitrationEnabled_(config.globalArbitrationEnabled),
arbitrationStateCheckCb_(config.arbitrationStateCheckCb),
checkUsageLeak_(config.checkUsageLeak) {
VELOX_CHECK_LE(reservedCapacity_, capacity_);
Expand All @@ -263,6 +268,7 @@ class MemoryArbitrator {
const uint64_t memoryPoolReservedCapacity_;
const uint64_t memoryPoolTransferCapacity_;
const uint64_t memoryReclaimWaitMs_;
const bool globalArbitrationEnabled_;
const MemoryArbitrationStateCheckCB arbitrationStateCheckCb_;
const bool checkUsageLeak_;
};
Expand Down
10 changes: 7 additions & 3 deletions velox/common/memory/MemoryPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -832,8 +832,8 @@ bool MemoryPoolImpl::maybeIncrementReservation(uint64_t size) {

// NOTE: we allow memory pool to overuse its memory during the memory
// arbitration process. The memory arbitration process itself needs to
// ensure the the memory pool usage of the memory pool is within the
// capacity limit after the arbitration operation completes.
// ensure the memory pool usage of the memory pool is within the capacity
// limit after the arbitration operation completes.
if (FOLLY_UNLIKELY(
(reservationBytes_ + size > capacity_) &&
!underMemoryArbitration())) {
Expand Down Expand Up @@ -954,7 +954,11 @@ uint64_t MemoryPoolImpl::freeBytes() const {
if (capacity_ == kMaxMemory) {
return 0;
}
VELOX_CHECK_GE(capacity_, reservationBytes_);
if (capacity_ < reservationBytes_) {
// NOTE: the memory reservation could be temporarily larger than its
// capacity if this memory pool is under memory arbitration processing.
return 0;
}
return capacity_ - reservationBytes_;
}

Expand Down
Loading

0 comments on commit 5911129

Please sign in to comment.