Skip to content

Commit

Permalink
Use ContinuePromise
Browse files Browse the repository at this point in the history
  • Loading branch information
mbasmanova committed Apr 13, 2022
1 parent 2b1dd9f commit cf866b7
Show file tree
Hide file tree
Showing 18 changed files with 54 additions and 53 deletions.
3 changes: 3 additions & 0 deletions velox/common/future/VeloxPromise.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,7 @@ std::pair<VeloxPromise<T>, folly::SemiFuture<T>> makeVeloxPromiseContract(
auto f = p.getSemiFuture();
return std::make_pair(std::move(p), std::move(f));
}

using ContinuePromise = VeloxPromise<bool>;

} // namespace facebook::velox
4 changes: 2 additions & 2 deletions velox/exec/CrossJoinBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
namespace facebook::velox::exec {

void CrossJoinBridge::setData(std::vector<VectorPtr> data) {
std::vector<VeloxPromise<bool>> promises;
std::vector<ContinuePromise> promises;
{
std::lock_guard<std::mutex> l(mutex_);
VELOX_CHECK(!data_.has_value(), "setData may be called only once");
Expand Down Expand Up @@ -72,7 +72,7 @@ BlockingReason CrossJoinBuild::isBlocked(ContinueFuture* future) {

void CrossJoinBuild::noMoreInput() {
Operator::noMoreInput();
std::vector<VeloxPromise<bool>> promises;
std::vector<ContinuePromise> promises;
std::vector<std::shared_ptr<Driver>> peers;
// The last Driver to hit CrossJoinBuild::finish gathers the data from
// all build Drivers and hands it over to the probe side. At this
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/Exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class ExchangeQueue {
bool atEnd_ = false;
std::mutex mutex_;
std::deque<std::unique_ptr<SerializedPage>> queue_;
std::vector<VeloxPromise<bool>> promises_;
std::vector<ContinuePromise> promises_;
// When set, all promises will be realized and the next dequeue will
// throw an exception with this message.
std::string error_;
Expand Down
6 changes: 3 additions & 3 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace facebook::velox::exec {
void HashJoinBridge::setHashTable(std::unique_ptr<BaseHashTable> table) {
VELOX_CHECK(table, "setHashTable called with null table");

std::vector<VeloxPromise<bool>> promises;
std::vector<ContinuePromise> promises;
{
std::lock_guard<std::mutex> l(mutex_);
VELOX_CHECK(!table_, "setHashTable may be called only once");
Expand All @@ -35,7 +35,7 @@ void HashJoinBridge::setHashTable(std::unique_ptr<BaseHashTable> table) {
}

void HashJoinBridge::setAntiJoinHasNullKeys() {
std::vector<VeloxPromise<bool>> promises;
std::vector<ContinuePromise> promises;
{
std::lock_guard<std::mutex> l(mutex_);
VELOX_CHECK(
Expand Down Expand Up @@ -191,7 +191,7 @@ void HashBuild::noMoreInput() {
}

Operator::noMoreInput();
std::vector<VeloxPromise<bool>> promises;
std::vector<ContinuePromise> promises;
std::vector<std::shared_ptr<Driver>> peers;
// The last Driver to hit HashBuild::finish gathers the data from
// all build Drivers and hands it over to the probe side. At this
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ void HashProbe::ensureLoadedIfNotAtEnd(ChannelIndex channel) {
void HashProbe::noMoreInput() {
Operator::noMoreInput();
if (isRightJoin(joinType_) || isFullJoin(joinType_)) {
std::vector<VeloxPromise<bool>> promises;
std::vector<ContinuePromise> promises;
std::vector<std::shared_ptr<Driver>> peers;
// The last Driver to hit HashProbe::finish is responsible for producing
// non-matching build-side rows for the right join.
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/JoinBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
namespace facebook::velox::exec {

// static
void JoinBridge::notify(std::vector<VeloxPromise<bool>> promises) {
void JoinBridge::notify(std::vector<ContinuePromise> promises) {
for (auto& promise : promises) {
promise.setValue(true);
}
}

void JoinBridge::cancel() {
std::vector<VeloxPromise<bool>> promises;
std::vector<ContinuePromise> promises;
{
std::lock_guard<std::mutex> l(mutex_);
cancelled_ = true;
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/JoinBridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ class JoinBridge {
void cancel();

protected:
static void notify(std::vector<VeloxPromise<bool>> promises);
static void notify(std::vector<ContinuePromise> promises);

std::mutex mutex_;
std::vector<VeloxPromise<bool>> promises_;
std::vector<ContinuePromise> promises_;
bool cancelled_{false};
};
} // namespace facebook::velox::exec
20 changes: 10 additions & 10 deletions velox/exec/LocalPartition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

namespace facebook::velox::exec {
namespace {
void notify(std::vector<VeloxPromise<bool>>& promises) {
void notify(std::vector<ContinuePromise>& promises) {
for (auto& promise : promises) {
promise.setValue(true);
}
Expand All @@ -42,7 +42,7 @@ bool LocalExchangeMemoryManager::increaseMemoryUsage(
}

void LocalExchangeMemoryManager::decreaseMemoryUsage(int64_t removed) {
std::vector<VeloxPromise<bool>> promises;
std::vector<ContinuePromise> promises;
{
std::lock_guard<std::mutex> l(mutex_);
bufferedBytes_ -= removed;
Expand All @@ -64,8 +64,8 @@ void LocalExchangeQueue::addProducer() {
}

void LocalExchangeQueue::noMoreProducers() {
std::vector<VeloxPromise<bool>> consumerPromises;
std::vector<VeloxPromise<bool>> producerPromises;
std::vector<ContinuePromise> consumerPromises;
std::vector<ContinuePromise> producerPromises;
queue_.withWLock([&](auto& queue) {
VELOX_CHECK(!noMoreProducers_, "noMoreProducers can be called only once");
noMoreProducers_ = true;
Expand All @@ -88,7 +88,7 @@ BlockingReason LocalExchangeQueue::enqueue(
ContinueFuture* future) {
auto inputBytes = input->retainedSize();

std::vector<VeloxPromise<bool>> consumerPromises;
std::vector<ContinuePromise> consumerPromises;
bool isClosed = queue_.withWLock([&](auto& queue) {
if (closed_) {
return true;
Expand All @@ -112,8 +112,8 @@ BlockingReason LocalExchangeQueue::enqueue(
}

void LocalExchangeQueue::noMoreData() {
std::vector<VeloxPromise<bool>> consumerPromises;
std::vector<VeloxPromise<bool>> producerPromises;
std::vector<ContinuePromise> consumerPromises;
std::vector<ContinuePromise> producerPromises;
queue_.withWLock([&](auto queue) {
VELOX_CHECK_GT(pendingProducers_, 0);
--pendingProducers_;
Expand All @@ -132,7 +132,7 @@ BlockingReason LocalExchangeQueue::next(
ContinueFuture* future,
memory::MemoryPool* pool,
RowVectorPtr* data) {
std::vector<VeloxPromise<bool>> producerPromises;
std::vector<ContinuePromise> producerPromises;
auto blockingReason = queue_.withWLock([&](auto& queue) {
*data = nullptr;
if (queue.empty()) {
Expand Down Expand Up @@ -192,8 +192,8 @@ bool LocalExchangeQueue::isFinished() {
}

void LocalExchangeQueue::close() {
std::vector<VeloxPromise<bool>> producerPromises;
std::vector<VeloxPromise<bool>> consumerPromises;
std::vector<ContinuePromise> producerPromises;
std::vector<ContinuePromise> consumerPromises;
queue_.withWLock([&](auto& queue) {
uint64_t freedBytes = 0;
while (!queue.empty()) {
Expand Down
6 changes: 3 additions & 3 deletions velox/exec/LocalPartition.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class LocalExchangeMemoryManager {
const int64_t maxBufferSize_;
std::mutex mutex_;
int64_t bufferedBytes_{0};
std::vector<VeloxPromise<bool>> promises_;
std::vector<ContinuePromise> promises_;
};

/// Buffers data for a single partition produced by local exchange. Allows
Expand Down Expand Up @@ -102,11 +102,11 @@ class LocalExchangeQueue {
// Satisfied when data becomes available or all producers report that they
// finished producing, e.g. queue_ is not empty or noMoreProducers_ is true
// and pendingProducers_ is zero.
std::vector<VeloxPromise<bool>> consumerPromises_;
std::vector<ContinuePromise> consumerPromises_;
// Satisfied when all data has been fetched and no more data will be produced,
// e.g. queue_ is empty, noMoreProducers_ is true and pendingProducers_ is
// zero.
std::vector<VeloxPromise<bool>> producerPromises_;
std::vector<ContinuePromise> producerPromises_;
int pendingProducers_{0};
bool noMoreProducers_{false};
bool closed_{false};
Expand Down
10 changes: 5 additions & 5 deletions velox/exec/MergeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ class LocalMergeSource : public MergeSource {

bool atEnd_{false};
boost::circular_buffer<RowVectorPtr> data_;
std::vector<VeloxPromise<bool>> consumerPromises_;
std::vector<VeloxPromise<bool>> producerPromises_;
std::vector<ContinuePromise> consumerPromises_;
std::vector<ContinuePromise> producerPromises_;
};

folly::Synchronized<LocalMergeSourceQueue> queue_;
Expand Down Expand Up @@ -192,7 +192,7 @@ std::shared_ptr<MergeSource> MergeSource::createMergeExchangeSource(
}

namespace {
void notify(std::optional<VeloxPromise<bool>>& promise) {
void notify(std::optional<ContinuePromise>& promise) {
if (promise) {
promise->setValue(true);
promise.reset();
Expand All @@ -215,7 +215,7 @@ BlockingReason MergeJoinSource::next(
return BlockingReason::kNotBlocked;
}

consumerPromise_ = VeloxPromise<bool>("MergeJoinSource::next");
consumerPromise_ = ContinuePromise("MergeJoinSource::next");
*future = consumerPromise_->getSemiFuture();
return BlockingReason::kWaitForExchange;
});
Expand All @@ -242,7 +242,7 @@ BlockingReason MergeJoinSource::enqueue(
state.data = std::move(data);
notify(consumerPromise_);

producerPromise_ = VeloxPromise<bool>("MergeJoinSource::enqueue");
producerPromise_ = ContinuePromise("MergeJoinSource::enqueue");
*future = producerPromise_->getSemiFuture();
return BlockingReason::kWaitForConsumer;
});
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/MergeSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ class MergeJoinSource {
// Satisfied when data becomes available or the producer reports that it
// finished producing, e.g. state_.data is not nullptr or state_.atEnd is
// true.
std::optional<VeloxPromise<bool>> consumerPromise_;
std::optional<ContinuePromise> consumerPromise_;

// Satisfied when previously enqueued data has been consumed, e.g. state_.data
// is nullptr.
std::optional<VeloxPromise<bool>> producerPromise_;
std::optional<ContinuePromise> producerPromise_;
};

} // namespace facebook::velox::exec
12 changes: 6 additions & 6 deletions velox/exec/PartitionedOutputBufferManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ namespace {
// producers which will allocate more memory.
void releaseAfterAcknowledge(
std::vector<std::shared_ptr<SerializedPage>>& freed,
std::vector<VeloxPromise<bool>>& promises) {
std::vector<ContinuePromise>& promises) {
freed.clear();
for (auto& promise : promises) {
promise.setValue(true);
Expand Down Expand Up @@ -167,7 +167,7 @@ void PartitionedOutputBuffer::updateBroadcastOutputBuffers(
bool noMoreBuffers) {
VELOX_CHECK(broadcast_);

std::vector<VeloxPromise<bool>> promises;
std::vector<ContinuePromise> promises;
{
std::lock_guard<std::mutex> l(mutex_);

Expand Down Expand Up @@ -316,7 +316,7 @@ bool PartitionedOutputBuffer::isFinishedLocked() {

void PartitionedOutputBuffer::acknowledge(int destination, int64_t sequence) {
std::vector<std::shared_ptr<SerializedPage>> freed;
std::vector<VeloxPromise<bool>> promises;
std::vector<ContinuePromise> promises;
{
std::lock_guard<std::mutex> l(mutex_);
VELOX_CHECK_LT(destination, buffers_.size());
Expand All @@ -334,7 +334,7 @@ void PartitionedOutputBuffer::acknowledge(int destination, int64_t sequence) {

void PartitionedOutputBuffer::updateAfterAcknowledgeLocked(
const std::vector<std::shared_ptr<SerializedPage>>& freed,
std::vector<VeloxPromise<bool>>& promises) {
std::vector<ContinuePromise>& promises) {
uint64_t totalFreed = 0;
for (const auto& free : freed) {
if (free.unique()) {
Expand All @@ -359,7 +359,7 @@ void PartitionedOutputBuffer::updateAfterAcknowledgeLocked(

bool PartitionedOutputBuffer::deleteResults(int destination) {
std::vector<std::shared_ptr<SerializedPage>> freed;
std::vector<VeloxPromise<bool>> promises;
std::vector<ContinuePromise> promises;
bool isFinished;
{
std::lock_guard<std::mutex> l(mutex_);
Expand Down Expand Up @@ -393,7 +393,7 @@ void PartitionedOutputBuffer::getData(
DataAvailableCallback notify) {
std::vector<std::shared_ptr<SerializedPage>> data;
std::vector<std::shared_ptr<SerializedPage>> freed;
std::vector<VeloxPromise<bool>> promises;
std::vector<ContinuePromise> promises;
{
std::lock_guard<std::mutex> l(mutex_);

Expand Down
4 changes: 2 additions & 2 deletions velox/exec/PartitionedOutputBufferManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class PartitionedOutputBuffer {
// 'promises'.
void updateAfterAcknowledgeLocked(
const std::vector<std::shared_ptr<SerializedPage>>& freed,
std::vector<VeloxPromise<bool>>& promises);
std::vector<ContinuePromise>& promises);

/// Given an updated total number of broadcast buffers, add any missing ones
/// and enqueue data that has been produced so far (e.g. dataToBroadcast_).
Expand Down Expand Up @@ -181,7 +181,7 @@ class PartitionedOutputBuffer {
std::mutex mutex_;
// Actual data size in 'buffers_'.
uint64_t totalSize_ = 0;
std::vector<VeloxPromise<bool>> promises_;
std::vector<ContinuePromise> promises_;
// One buffer per destination
std::vector<std::unique_ptr<DestinationBuffer>> buffers_;
uint32_t numFinished_{0};
Expand Down
6 changes: 3 additions & 3 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,7 @@ bool Task::allPeersFinished(
const core::PlanNodeId& planNodeId,
Driver* caller,
ContinueFuture* future,
std::vector<VeloxPromise<bool>>& promises,
std::vector<ContinuePromise>& promises,
std::vector<std::shared_ptr<Driver>>& peers) {
std::lock_guard<std::mutex> l(mutex_);
if (exception_) {
Expand Down Expand Up @@ -972,8 +972,8 @@ std::string Task::shortId(const std::string& id) {

/// Moves split promises from one vector to another.
static void movePromisesOut(
std::vector<VeloxPromise<bool>>& from,
std::vector<VeloxPromise<bool>>& to) {
std::vector<ContinuePromise>& from,
std::vector<ContinuePromise>& to) {
for (auto& promise : from) {
to.push_back(std::move(promise));
}
Expand Down
8 changes: 3 additions & 5 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ class PartitionedOutputBufferManager;
class HashJoinBridge;
class CrossJoinBridge;

using ContinuePromise = VeloxPromise<bool>;

class Task : public std::enable_shared_from_this<Task> {
public:
/// Creates a task to execute a plan fragment, but doesn't start execution
Expand Down Expand Up @@ -333,7 +331,7 @@ class Task : public std::enable_shared_from_this<Task> {
const core::PlanNodeId& planNodeId,
Driver* FOLLY_NONNULL caller,
ContinueFuture* FOLLY_NONNULL future,
std::vector<VeloxPromise<bool>>& promises,
std::vector<ContinuePromise>& promises,
std::vector<std::shared_ptr<Driver>>& peers);

// Adds HashJoinBridge's for all the specified plan node IDs.
Expand Down Expand Up @@ -619,7 +617,7 @@ class Task : public std::enable_shared_from_this<Task> {
/// Stores separate splits state for each plan node.
std::unordered_map<core::PlanNodeId, SplitsState> splitsStates_;

std::vector<VeloxPromise<bool>> stateChangePromises_;
std::vector<ContinuePromise> stateChangePromises_;

TaskStats taskStats_;
std::unique_ptr<memory::MemoryPool> pool_;
Expand Down Expand Up @@ -651,7 +649,7 @@ class Task : public std::enable_shared_from_this<Task> {
// Promises for the futures returned to callers of requestPause() or
// terminate(). They are fulfilled when the last thread stops
// running for 'this'.
std::vector<VeloxPromise<bool>> threadFinishPromises_;
std::vector<ContinuePromise> threadFinishPromises_;
};

/// Listener invoked on task completion.
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/TaskStructs.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ enum TaskState { kRunning, kFinished, kCanceled, kAborted, kFailed };
struct BarrierState {
int32_t numRequested;
std::vector<std::shared_ptr<Driver>> drivers;
std::vector<VeloxPromise<bool>> promises;
std::vector<ContinuePromise> promises;
};

/// Structure to accumulate splits for distribution.
Expand All @@ -44,7 +44,7 @@ struct SplitsStore {
/// Signal, that no more splits will arrive.
bool noMoreSplits{false};
/// Blocking promises given out when out of splits to distribute.
std::vector<VeloxPromise<bool>> splitPromises;
std::vector<ContinuePromise> splitPromises;
};

/// Structure contains the current info on splits for a particular plan node.
Expand Down
Loading

0 comments on commit cf866b7

Please sign in to comment.