From 9882884346c633c486bb4bdc4eb15876ba7f165d Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Wed, 13 Apr 2022 16:46:57 -0700 Subject: [PATCH] Use ContinuePromise (#1410) Summary: Use ContinuePromise instead of VeloxPromise throughout the code base for consistency (with ContinueFuture) and readability. Pull Request resolved: https://github.com/facebookincubator/velox/pull/1410 Reviewed By: kagamiori Differential Revision: D35617219 Pulled By: mbasmanova fbshipit-source-id: 755951f4dc57204581baa67294bc5107df8e9219 --- velox/common/future/VeloxPromise.h | 3 +++ velox/exec/CrossJoinBuild.cpp | 4 ++-- velox/exec/Exchange.h | 2 +- velox/exec/HashBuild.cpp | 6 +++--- velox/exec/HashProbe.cpp | 2 +- velox/exec/JoinBridge.cpp | 4 ++-- velox/exec/JoinBridge.h | 4 ++-- velox/exec/LocalPartition.cpp | 20 +++++++++---------- velox/exec/LocalPartition.h | 6 +++--- velox/exec/MergeSource.cpp | 10 +++++----- velox/exec/MergeSource.h | 4 ++-- velox/exec/PartitionedOutputBufferManager.cpp | 12 +++++------ velox/exec/PartitionedOutputBufferManager.h | 4 ++-- velox/exec/Task.cpp | 6 +++--- velox/exec/Task.h | 8 +++----- velox/exec/TaskStructs.h | 4 ++-- velox/exec/tests/utils/Cursor.cpp | 4 ++-- velox/exec/tests/utils/Cursor.h | 4 ++-- 18 files changed, 54 insertions(+), 53 deletions(-) diff --git a/velox/common/future/VeloxPromise.h b/velox/common/future/VeloxPromise.h index 18cd3ff9f539..45ca6373afba 100644 --- a/velox/common/future/VeloxPromise.h +++ b/velox/common/future/VeloxPromise.h @@ -68,4 +68,7 @@ std::pair, folly::SemiFuture> makeVeloxPromiseContract( auto f = p.getSemiFuture(); return std::make_pair(std::move(p), std::move(f)); } + +using ContinuePromise = VeloxPromise; + } // namespace facebook::velox diff --git a/velox/exec/CrossJoinBuild.cpp b/velox/exec/CrossJoinBuild.cpp index a0d7ccd6dcf2..c26f1299b3db 100644 --- a/velox/exec/CrossJoinBuild.cpp +++ b/velox/exec/CrossJoinBuild.cpp @@ -19,7 +19,7 @@ namespace facebook::velox::exec { void CrossJoinBridge::setData(std::vector data) { - std::vector> promises; + std::vector promises; { std::lock_guard l(mutex_); VELOX_CHECK(!data_.has_value(), "setData may be called only once"); @@ -72,7 +72,7 @@ BlockingReason CrossJoinBuild::isBlocked(ContinueFuture* future) { void CrossJoinBuild::noMoreInput() { Operator::noMoreInput(); - std::vector> promises; + std::vector promises; std::vector> peers; // The last Driver to hit CrossJoinBuild::finish gathers the data from // all build Drivers and hands it over to the probe side. At this diff --git a/velox/exec/Exchange.h b/velox/exec/Exchange.h index afd003192947..67a85d038f9f 100644 --- a/velox/exec/Exchange.h +++ b/velox/exec/Exchange.h @@ -177,7 +177,7 @@ class ExchangeQueue { bool atEnd_ = false; std::mutex mutex_; std::deque> queue_; - std::vector> promises_; + std::vector promises_; // When set, all promises will be realized and the next dequeue will // throw an exception with this message. std::string error_; diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index c300681dfcde..81c9956bdb61 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -23,7 +23,7 @@ namespace facebook::velox::exec { void HashJoinBridge::setHashTable(std::unique_ptr table) { VELOX_CHECK(table, "setHashTable called with null table"); - std::vector> promises; + std::vector promises; { std::lock_guard l(mutex_); VELOX_CHECK(!table_, "setHashTable may be called only once"); @@ -35,7 +35,7 @@ void HashJoinBridge::setHashTable(std::unique_ptr table) { } void HashJoinBridge::setAntiJoinHasNullKeys() { - std::vector> promises; + std::vector promises; { std::lock_guard l(mutex_); VELOX_CHECK( @@ -191,7 +191,7 @@ void HashBuild::noMoreInput() { } Operator::noMoreInput(); - std::vector> promises; + std::vector promises; std::vector> peers; // The last Driver to hit HashBuild::finish gathers the data from // all build Drivers and hands it over to the probe side. At this diff --git a/velox/exec/HashProbe.cpp b/velox/exec/HashProbe.cpp index b2dd59738f03..a1476a9361ac 100644 --- a/velox/exec/HashProbe.cpp +++ b/velox/exec/HashProbe.cpp @@ -576,7 +576,7 @@ void HashProbe::ensureLoadedIfNotAtEnd(ChannelIndex channel) { void HashProbe::noMoreInput() { Operator::noMoreInput(); if (isRightJoin(joinType_) || isFullJoin(joinType_)) { - std::vector> promises; + std::vector promises; std::vector> peers; // The last Driver to hit HashProbe::finish is responsible for producing // non-matching build-side rows for the right join. diff --git a/velox/exec/JoinBridge.cpp b/velox/exec/JoinBridge.cpp index 0ac21ee0d738..95f9af9c25f4 100644 --- a/velox/exec/JoinBridge.cpp +++ b/velox/exec/JoinBridge.cpp @@ -18,14 +18,14 @@ namespace facebook::velox::exec { // static -void JoinBridge::notify(std::vector> promises) { +void JoinBridge::notify(std::vector promises) { for (auto& promise : promises) { promise.setValue(true); } } void JoinBridge::cancel() { - std::vector> promises; + std::vector promises; { std::lock_guard l(mutex_); cancelled_ = true; diff --git a/velox/exec/JoinBridge.h b/velox/exec/JoinBridge.h index f1823b8b5071..3cf50773bc13 100644 --- a/velox/exec/JoinBridge.h +++ b/velox/exec/JoinBridge.h @@ -28,10 +28,10 @@ class JoinBridge { void cancel(); protected: - static void notify(std::vector> promises); + static void notify(std::vector promises); std::mutex mutex_; - std::vector> promises_; + std::vector promises_; bool cancelled_{false}; }; } // namespace facebook::velox::exec diff --git a/velox/exec/LocalPartition.cpp b/velox/exec/LocalPartition.cpp index 19979f1c3b94..28b8cff42019 100644 --- a/velox/exec/LocalPartition.cpp +++ b/velox/exec/LocalPartition.cpp @@ -19,7 +19,7 @@ namespace facebook::velox::exec { namespace { -void notify(std::vector>& promises) { +void notify(std::vector& promises) { for (auto& promise : promises) { promise.setValue(true); } @@ -42,7 +42,7 @@ bool LocalExchangeMemoryManager::increaseMemoryUsage( } void LocalExchangeMemoryManager::decreaseMemoryUsage(int64_t removed) { - std::vector> promises; + std::vector promises; { std::lock_guard l(mutex_); bufferedBytes_ -= removed; @@ -64,8 +64,8 @@ void LocalExchangeQueue::addProducer() { } void LocalExchangeQueue::noMoreProducers() { - std::vector> consumerPromises; - std::vector> producerPromises; + std::vector consumerPromises; + std::vector producerPromises; queue_.withWLock([&](auto& queue) { VELOX_CHECK(!noMoreProducers_, "noMoreProducers can be called only once"); noMoreProducers_ = true; @@ -88,7 +88,7 @@ BlockingReason LocalExchangeQueue::enqueue( ContinueFuture* future) { auto inputBytes = input->retainedSize(); - std::vector> consumerPromises; + std::vector consumerPromises; bool isClosed = queue_.withWLock([&](auto& queue) { if (closed_) { return true; @@ -112,8 +112,8 @@ BlockingReason LocalExchangeQueue::enqueue( } void LocalExchangeQueue::noMoreData() { - std::vector> consumerPromises; - std::vector> producerPromises; + std::vector consumerPromises; + std::vector producerPromises; queue_.withWLock([&](auto queue) { VELOX_CHECK_GT(pendingProducers_, 0); --pendingProducers_; @@ -132,7 +132,7 @@ BlockingReason LocalExchangeQueue::next( ContinueFuture* future, memory::MemoryPool* pool, RowVectorPtr* data) { - std::vector> producerPromises; + std::vector producerPromises; auto blockingReason = queue_.withWLock([&](auto& queue) { *data = nullptr; if (queue.empty()) { @@ -192,8 +192,8 @@ bool LocalExchangeQueue::isFinished() { } void LocalExchangeQueue::close() { - std::vector> producerPromises; - std::vector> consumerPromises; + std::vector producerPromises; + std::vector consumerPromises; queue_.withWLock([&](auto& queue) { uint64_t freedBytes = 0; while (!queue.empty()) { diff --git a/velox/exec/LocalPartition.h b/velox/exec/LocalPartition.h index 9d6e5c0748d0..e030bbc60e10 100644 --- a/velox/exec/LocalPartition.h +++ b/velox/exec/LocalPartition.h @@ -37,7 +37,7 @@ class LocalExchangeMemoryManager { const int64_t maxBufferSize_; std::mutex mutex_; int64_t bufferedBytes_{0}; - std::vector> promises_; + std::vector promises_; }; /// Buffers data for a single partition produced by local exchange. Allows @@ -102,11 +102,11 @@ class LocalExchangeQueue { // Satisfied when data becomes available or all producers report that they // finished producing, e.g. queue_ is not empty or noMoreProducers_ is true // and pendingProducers_ is zero. - std::vector> consumerPromises_; + std::vector consumerPromises_; // Satisfied when all data has been fetched and no more data will be produced, // e.g. queue_ is empty, noMoreProducers_ is true and pendingProducers_ is // zero. - std::vector> producerPromises_; + std::vector producerPromises_; int pendingProducers_{0}; bool noMoreProducers_{false}; bool closed_{false}; diff --git a/velox/exec/MergeSource.cpp b/velox/exec/MergeSource.cpp index bdfc14e4bb1f..2872ca1e20e3 100644 --- a/velox/exec/MergeSource.cpp +++ b/velox/exec/MergeSource.cpp @@ -104,8 +104,8 @@ class LocalMergeSource : public MergeSource { bool atEnd_{false}; boost::circular_buffer data_; - std::vector> consumerPromises_; - std::vector> producerPromises_; + std::vector consumerPromises_; + std::vector producerPromises_; }; folly::Synchronized queue_; @@ -192,7 +192,7 @@ std::shared_ptr MergeSource::createMergeExchangeSource( } namespace { -void notify(std::optional>& promise) { +void notify(std::optional& promise) { if (promise) { promise->setValue(true); promise.reset(); @@ -215,7 +215,7 @@ BlockingReason MergeJoinSource::next( return BlockingReason::kNotBlocked; } - consumerPromise_ = VeloxPromise("MergeJoinSource::next"); + consumerPromise_ = ContinuePromise("MergeJoinSource::next"); *future = consumerPromise_->getSemiFuture(); return BlockingReason::kWaitForExchange; }); @@ -242,7 +242,7 @@ BlockingReason MergeJoinSource::enqueue( state.data = std::move(data); notify(consumerPromise_); - producerPromise_ = VeloxPromise("MergeJoinSource::enqueue"); + producerPromise_ = ContinuePromise("MergeJoinSource::enqueue"); *future = producerPromise_->getSemiFuture(); return BlockingReason::kWaitForConsumer; }); diff --git a/velox/exec/MergeSource.h b/velox/exec/MergeSource.h index 77b2e10ac716..1661fa256b51 100644 --- a/velox/exec/MergeSource.h +++ b/velox/exec/MergeSource.h @@ -65,11 +65,11 @@ class MergeJoinSource { // Satisfied when data becomes available or the producer reports that it // finished producing, e.g. state_.data is not nullptr or state_.atEnd is // true. - std::optional> consumerPromise_; + std::optional consumerPromise_; // Satisfied when previously enqueued data has been consumed, e.g. state_.data // is nullptr. - std::optional> producerPromise_; + std::optional producerPromise_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/PartitionedOutputBufferManager.cpp b/velox/exec/PartitionedOutputBufferManager.cpp index 1bbad17001d0..627fd4067b14 100644 --- a/velox/exec/PartitionedOutputBufferManager.cpp +++ b/velox/exec/PartitionedOutputBufferManager.cpp @@ -138,7 +138,7 @@ namespace { // producers which will allocate more memory. void releaseAfterAcknowledge( std::vector>& freed, - std::vector>& promises) { + std::vector& promises) { freed.clear(); for (auto& promise : promises) { promise.setValue(true); @@ -167,7 +167,7 @@ void PartitionedOutputBuffer::updateBroadcastOutputBuffers( bool noMoreBuffers) { VELOX_CHECK(broadcast_); - std::vector> promises; + std::vector promises; { std::lock_guard l(mutex_); @@ -316,7 +316,7 @@ bool PartitionedOutputBuffer::isFinishedLocked() { void PartitionedOutputBuffer::acknowledge(int destination, int64_t sequence) { std::vector> freed; - std::vector> promises; + std::vector promises; { std::lock_guard l(mutex_); VELOX_CHECK_LT(destination, buffers_.size()); @@ -334,7 +334,7 @@ void PartitionedOutputBuffer::acknowledge(int destination, int64_t sequence) { void PartitionedOutputBuffer::updateAfterAcknowledgeLocked( const std::vector>& freed, - std::vector>& promises) { + std::vector& promises) { uint64_t totalFreed = 0; for (const auto& free : freed) { if (free.unique()) { @@ -359,7 +359,7 @@ void PartitionedOutputBuffer::updateAfterAcknowledgeLocked( bool PartitionedOutputBuffer::deleteResults(int destination) { std::vector> freed; - std::vector> promises; + std::vector promises; bool isFinished; { std::lock_guard l(mutex_); @@ -393,7 +393,7 @@ void PartitionedOutputBuffer::getData( DataAvailableCallback notify) { std::vector> data; std::vector> freed; - std::vector> promises; + std::vector promises; { std::lock_guard l(mutex_); diff --git a/velox/exec/PartitionedOutputBufferManager.h b/velox/exec/PartitionedOutputBufferManager.h index dac60b31fd04..a3984ea811b0 100644 --- a/velox/exec/PartitionedOutputBufferManager.h +++ b/velox/exec/PartitionedOutputBufferManager.h @@ -153,7 +153,7 @@ class PartitionedOutputBuffer { // 'promises'. void updateAfterAcknowledgeLocked( const std::vector>& freed, - std::vector>& promises); + std::vector& promises); /// Given an updated total number of broadcast buffers, add any missing ones /// and enqueue data that has been produced so far (e.g. dataToBroadcast_). @@ -181,7 +181,7 @@ class PartitionedOutputBuffer { std::mutex mutex_; // Actual data size in 'buffers_'. uint64_t totalSize_ = 0; - std::vector> promises_; + std::vector promises_; // One buffer per destination std::vector> buffers_; uint32_t numFinished_{0}; diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index ca3501b5f353..debd0d305084 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -891,7 +891,7 @@ bool Task::allPeersFinished( const core::PlanNodeId& planNodeId, Driver* caller, ContinueFuture* future, - std::vector>& promises, + std::vector& promises, std::vector>& peers) { std::lock_guard l(mutex_); if (exception_) { @@ -993,8 +993,8 @@ std::string Task::shortId(const std::string& id) { /// Moves split promises from one vector to another. static void movePromisesOut( - std::vector>& from, - std::vector>& to) { + std::vector& from, + std::vector& to) { for (auto& promise : from) { to.push_back(std::move(promise)); } diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 68901a029729..521add4305fb 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -31,8 +31,6 @@ class PartitionedOutputBufferManager; class HashJoinBridge; class CrossJoinBridge; -using ContinuePromise = VeloxPromise; - class Task : public std::enable_shared_from_this { public: /// Creates a task to execute a plan fragment, but doesn't start execution @@ -333,7 +331,7 @@ class Task : public std::enable_shared_from_this { const core::PlanNodeId& planNodeId, Driver* FOLLY_NONNULL caller, ContinueFuture* FOLLY_NONNULL future, - std::vector>& promises, + std::vector& promises, std::vector>& peers); // Adds HashJoinBridge's for all the specified plan node IDs. @@ -654,7 +652,7 @@ class Task : public std::enable_shared_from_this { /// Stores separate splits state for each plan node. std::unordered_map splitsStates_; - std::vector> stateChangePromises_; + std::vector stateChangePromises_; TaskStats taskStats_; std::unique_ptr pool_; @@ -686,7 +684,7 @@ class Task : public std::enable_shared_from_this { // Promises for the futures returned to callers of requestPause() or // terminate(). They are fulfilled when the last thread stops // running for 'this'. - std::vector> threadFinishPromises_; + std::vector threadFinishPromises_; }; /// Listener invoked on task completion. diff --git a/velox/exec/TaskStructs.h b/velox/exec/TaskStructs.h index f9d586ec0b54..f2890dedeea2 100644 --- a/velox/exec/TaskStructs.h +++ b/velox/exec/TaskStructs.h @@ -34,7 +34,7 @@ enum TaskState { kRunning, kFinished, kCanceled, kAborted, kFailed }; struct BarrierState { int32_t numRequested; std::vector> drivers; - std::vector> promises; + std::vector promises; }; /// Structure to accumulate splits for distribution. @@ -44,7 +44,7 @@ struct SplitsStore { /// Signal, that no more splits will arrive. bool noMoreSplits{false}; /// Blocking promises given out when out of splits to distribute. - std::vector> splitPromises; + std::vector splitPromises; }; /// Structure contains the current info on splits for a particular plan node. diff --git a/velox/exec/tests/utils/Cursor.cpp b/velox/exec/tests/utils/Cursor.cpp index 6f1fbd3ba718..1d85aa6f05f3 100644 --- a/velox/exec/tests/utils/Cursor.cpp +++ b/velox/exec/tests/utils/Cursor.cpp @@ -57,7 +57,7 @@ exec::BlockingReason TaskQueue::enqueue( RowVectorPtr TaskQueue::dequeue() { for (;;) { RowVectorPtr vector; - std::vector> mayContinue; + std::vector mayContinue; { std::lock_guard l(mutex_); if (!queue_.empty()) { @@ -74,7 +74,7 @@ RowVectorPtr TaskQueue::dequeue() { } if (!vector) { consumerBlocked_ = true; - consumerPromise_ = VeloxPromise(); + consumerPromise_ = ContinuePromise(); consumerFuture_ = consumerPromise_.getFuture(); } } diff --git a/velox/exec/tests/utils/Cursor.h b/velox/exec/tests/utils/Cursor.h index c4eb00274654..5920b1de9910 100644 --- a/velox/exec/tests/utils/Cursor.h +++ b/velox/exec/tests/utils/Cursor.h @@ -92,9 +92,9 @@ class TaskQueue { // adding the result. uint64_t maxBytes_; std::mutex mutex_; - std::vector> producerUnblockPromises_; + std::vector producerUnblockPromises_; bool consumerBlocked_ = false; - VeloxPromise consumerPromise_; + ContinuePromise consumerPromise_; folly::Future consumerFuture_; bool closed_ = false; };