Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ContinuePromise #1410

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions velox/common/future/VeloxPromise.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,7 @@ std::pair<VeloxPromise<T>, folly::SemiFuture<T>> makeVeloxPromiseContract(
auto f = p.getSemiFuture();
return std::make_pair(std::move(p), std::move(f));
}

using ContinuePromise = VeloxPromise<bool>;

} // namespace facebook::velox
4 changes: 2 additions & 2 deletions velox/exec/CrossJoinBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
namespace facebook::velox::exec {

void CrossJoinBridge::setData(std::vector<VectorPtr> data) {
std::vector<VeloxPromise<bool>> promises;
std::vector<ContinuePromise> promises;
{
std::lock_guard<std::mutex> l(mutex_);
VELOX_CHECK(!data_.has_value(), "setData may be called only once");
Expand Down Expand Up @@ -72,7 +72,7 @@ BlockingReason CrossJoinBuild::isBlocked(ContinueFuture* future) {

void CrossJoinBuild::noMoreInput() {
Operator::noMoreInput();
std::vector<VeloxPromise<bool>> promises;
std::vector<ContinuePromise> promises;
std::vector<std::shared_ptr<Driver>> peers;
// The last Driver to hit CrossJoinBuild::finish gathers the data from
// all build Drivers and hands it over to the probe side. At this
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/Exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class ExchangeQueue {
bool atEnd_ = false;
std::mutex mutex_;
std::deque<std::unique_ptr<SerializedPage>> queue_;
std::vector<VeloxPromise<bool>> promises_;
std::vector<ContinuePromise> promises_;
// When set, all promises will be realized and the next dequeue will
// throw an exception with this message.
std::string error_;
Expand Down
6 changes: 3 additions & 3 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace facebook::velox::exec {
void HashJoinBridge::setHashTable(std::unique_ptr<BaseHashTable> table) {
VELOX_CHECK(table, "setHashTable called with null table");

std::vector<VeloxPromise<bool>> promises;
std::vector<ContinuePromise> promises;
{
std::lock_guard<std::mutex> l(mutex_);
VELOX_CHECK(!table_, "setHashTable may be called only once");
Expand All @@ -35,7 +35,7 @@ void HashJoinBridge::setHashTable(std::unique_ptr<BaseHashTable> table) {
}

void HashJoinBridge::setAntiJoinHasNullKeys() {
std::vector<VeloxPromise<bool>> promises;
std::vector<ContinuePromise> promises;
{
std::lock_guard<std::mutex> l(mutex_);
VELOX_CHECK(
Expand Down Expand Up @@ -191,7 +191,7 @@ void HashBuild::noMoreInput() {
}

Operator::noMoreInput();
std::vector<VeloxPromise<bool>> promises;
std::vector<ContinuePromise> promises;
std::vector<std::shared_ptr<Driver>> peers;
// The last Driver to hit HashBuild::finish gathers the data from
// all build Drivers and hands it over to the probe side. At this
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ void HashProbe::ensureLoadedIfNotAtEnd(ChannelIndex channel) {
void HashProbe::noMoreInput() {
Operator::noMoreInput();
if (isRightJoin(joinType_) || isFullJoin(joinType_)) {
std::vector<VeloxPromise<bool>> promises;
std::vector<ContinuePromise> promises;
std::vector<std::shared_ptr<Driver>> peers;
// The last Driver to hit HashProbe::finish is responsible for producing
// non-matching build-side rows for the right join.
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/JoinBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
namespace facebook::velox::exec {

// static
void JoinBridge::notify(std::vector<VeloxPromise<bool>> promises) {
void JoinBridge::notify(std::vector<ContinuePromise> promises) {
for (auto& promise : promises) {
promise.setValue(true);
}
}

void JoinBridge::cancel() {
std::vector<VeloxPromise<bool>> promises;
std::vector<ContinuePromise> promises;
{
std::lock_guard<std::mutex> l(mutex_);
cancelled_ = true;
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/JoinBridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ class JoinBridge {
void cancel();

protected:
static void notify(std::vector<VeloxPromise<bool>> promises);
static void notify(std::vector<ContinuePromise> promises);

std::mutex mutex_;
std::vector<VeloxPromise<bool>> promises_;
std::vector<ContinuePromise> promises_;
bool cancelled_{false};
};
} // namespace facebook::velox::exec
20 changes: 10 additions & 10 deletions velox/exec/LocalPartition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

namespace facebook::velox::exec {
namespace {
void notify(std::vector<VeloxPromise<bool>>& promises) {
void notify(std::vector<ContinuePromise>& promises) {
for (auto& promise : promises) {
promise.setValue(true);
}
Expand All @@ -42,7 +42,7 @@ bool LocalExchangeMemoryManager::increaseMemoryUsage(
}

void LocalExchangeMemoryManager::decreaseMemoryUsage(int64_t removed) {
std::vector<VeloxPromise<bool>> promises;
std::vector<ContinuePromise> promises;
{
std::lock_guard<std::mutex> l(mutex_);
bufferedBytes_ -= removed;
Expand All @@ -64,8 +64,8 @@ void LocalExchangeQueue::addProducer() {
}

void LocalExchangeQueue::noMoreProducers() {
std::vector<VeloxPromise<bool>> consumerPromises;
std::vector<VeloxPromise<bool>> producerPromises;
std::vector<ContinuePromise> consumerPromises;
std::vector<ContinuePromise> producerPromises;
queue_.withWLock([&](auto& queue) {
VELOX_CHECK(!noMoreProducers_, "noMoreProducers can be called only once");
noMoreProducers_ = true;
Expand All @@ -88,7 +88,7 @@ BlockingReason LocalExchangeQueue::enqueue(
ContinueFuture* future) {
auto inputBytes = input->retainedSize();

std::vector<VeloxPromise<bool>> consumerPromises;
std::vector<ContinuePromise> consumerPromises;
bool isClosed = queue_.withWLock([&](auto& queue) {
if (closed_) {
return true;
Expand All @@ -112,8 +112,8 @@ BlockingReason LocalExchangeQueue::enqueue(
}

void LocalExchangeQueue::noMoreData() {
std::vector<VeloxPromise<bool>> consumerPromises;
std::vector<VeloxPromise<bool>> producerPromises;
std::vector<ContinuePromise> consumerPromises;
std::vector<ContinuePromise> producerPromises;
queue_.withWLock([&](auto queue) {
VELOX_CHECK_GT(pendingProducers_, 0);
--pendingProducers_;
Expand All @@ -132,7 +132,7 @@ BlockingReason LocalExchangeQueue::next(
ContinueFuture* future,
memory::MemoryPool* pool,
RowVectorPtr* data) {
std::vector<VeloxPromise<bool>> producerPromises;
std::vector<ContinuePromise> producerPromises;
auto blockingReason = queue_.withWLock([&](auto& queue) {
*data = nullptr;
if (queue.empty()) {
Expand Down Expand Up @@ -192,8 +192,8 @@ bool LocalExchangeQueue::isFinished() {
}

void LocalExchangeQueue::close() {
std::vector<VeloxPromise<bool>> producerPromises;
std::vector<VeloxPromise<bool>> consumerPromises;
std::vector<ContinuePromise> producerPromises;
std::vector<ContinuePromise> consumerPromises;
queue_.withWLock([&](auto& queue) {
uint64_t freedBytes = 0;
while (!queue.empty()) {
Expand Down
6 changes: 3 additions & 3 deletions velox/exec/LocalPartition.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class LocalExchangeMemoryManager {
const int64_t maxBufferSize_;
std::mutex mutex_;
int64_t bufferedBytes_{0};
std::vector<VeloxPromise<bool>> promises_;
std::vector<ContinuePromise> promises_;
};

/// Buffers data for a single partition produced by local exchange. Allows
Expand Down Expand Up @@ -102,11 +102,11 @@ class LocalExchangeQueue {
// Satisfied when data becomes available or all producers report that they
// finished producing, e.g. queue_ is not empty or noMoreProducers_ is true
// and pendingProducers_ is zero.
std::vector<VeloxPromise<bool>> consumerPromises_;
std::vector<ContinuePromise> consumerPromises_;
// Satisfied when all data has been fetched and no more data will be produced,
// e.g. queue_ is empty, noMoreProducers_ is true and pendingProducers_ is
// zero.
std::vector<VeloxPromise<bool>> producerPromises_;
std::vector<ContinuePromise> producerPromises_;
int pendingProducers_{0};
bool noMoreProducers_{false};
bool closed_{false};
Expand Down
10 changes: 5 additions & 5 deletions velox/exec/MergeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ class LocalMergeSource : public MergeSource {

bool atEnd_{false};
boost::circular_buffer<RowVectorPtr> data_;
std::vector<VeloxPromise<bool>> consumerPromises_;
std::vector<VeloxPromise<bool>> producerPromises_;
std::vector<ContinuePromise> consumerPromises_;
std::vector<ContinuePromise> producerPromises_;
};

folly::Synchronized<LocalMergeSourceQueue> queue_;
Expand Down Expand Up @@ -192,7 +192,7 @@ std::shared_ptr<MergeSource> MergeSource::createMergeExchangeSource(
}

namespace {
void notify(std::optional<VeloxPromise<bool>>& promise) {
void notify(std::optional<ContinuePromise>& promise) {
if (promise) {
promise->setValue(true);
promise.reset();
Expand All @@ -215,7 +215,7 @@ BlockingReason MergeJoinSource::next(
return BlockingReason::kNotBlocked;
}

consumerPromise_ = VeloxPromise<bool>("MergeJoinSource::next");
consumerPromise_ = ContinuePromise("MergeJoinSource::next");
*future = consumerPromise_->getSemiFuture();
return BlockingReason::kWaitForExchange;
});
Expand All @@ -242,7 +242,7 @@ BlockingReason MergeJoinSource::enqueue(
state.data = std::move(data);
notify(consumerPromise_);

producerPromise_ = VeloxPromise<bool>("MergeJoinSource::enqueue");
producerPromise_ = ContinuePromise("MergeJoinSource::enqueue");
*future = producerPromise_->getSemiFuture();
return BlockingReason::kWaitForConsumer;
});
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/MergeSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ class MergeJoinSource {
// Satisfied when data becomes available or the producer reports that it
// finished producing, e.g. state_.data is not nullptr or state_.atEnd is
// true.
std::optional<VeloxPromise<bool>> consumerPromise_;
std::optional<ContinuePromise> consumerPromise_;

// Satisfied when previously enqueued data has been consumed, e.g. state_.data
// is nullptr.
std::optional<VeloxPromise<bool>> producerPromise_;
std::optional<ContinuePromise> producerPromise_;
};

} // namespace facebook::velox::exec
12 changes: 6 additions & 6 deletions velox/exec/PartitionedOutputBufferManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ namespace {
// producers which will allocate more memory.
void releaseAfterAcknowledge(
std::vector<std::shared_ptr<SerializedPage>>& freed,
std::vector<VeloxPromise<bool>>& promises) {
std::vector<ContinuePromise>& promises) {
freed.clear();
for (auto& promise : promises) {
promise.setValue(true);
Expand Down Expand Up @@ -167,7 +167,7 @@ void PartitionedOutputBuffer::updateBroadcastOutputBuffers(
bool noMoreBuffers) {
VELOX_CHECK(broadcast_);

std::vector<VeloxPromise<bool>> promises;
std::vector<ContinuePromise> promises;
{
std::lock_guard<std::mutex> l(mutex_);

Expand Down Expand Up @@ -316,7 +316,7 @@ bool PartitionedOutputBuffer::isFinishedLocked() {

void PartitionedOutputBuffer::acknowledge(int destination, int64_t sequence) {
std::vector<std::shared_ptr<SerializedPage>> freed;
std::vector<VeloxPromise<bool>> promises;
std::vector<ContinuePromise> promises;
{
std::lock_guard<std::mutex> l(mutex_);
VELOX_CHECK_LT(destination, buffers_.size());
Expand All @@ -334,7 +334,7 @@ void PartitionedOutputBuffer::acknowledge(int destination, int64_t sequence) {

void PartitionedOutputBuffer::updateAfterAcknowledgeLocked(
const std::vector<std::shared_ptr<SerializedPage>>& freed,
std::vector<VeloxPromise<bool>>& promises) {
std::vector<ContinuePromise>& promises) {
uint64_t totalFreed = 0;
for (const auto& free : freed) {
if (free.unique()) {
Expand All @@ -359,7 +359,7 @@ void PartitionedOutputBuffer::updateAfterAcknowledgeLocked(

bool PartitionedOutputBuffer::deleteResults(int destination) {
std::vector<std::shared_ptr<SerializedPage>> freed;
std::vector<VeloxPromise<bool>> promises;
std::vector<ContinuePromise> promises;
bool isFinished;
{
std::lock_guard<std::mutex> l(mutex_);
Expand Down Expand Up @@ -393,7 +393,7 @@ void PartitionedOutputBuffer::getData(
DataAvailableCallback notify) {
std::vector<std::shared_ptr<SerializedPage>> data;
std::vector<std::shared_ptr<SerializedPage>> freed;
std::vector<VeloxPromise<bool>> promises;
std::vector<ContinuePromise> promises;
{
std::lock_guard<std::mutex> l(mutex_);

Expand Down
4 changes: 2 additions & 2 deletions velox/exec/PartitionedOutputBufferManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class PartitionedOutputBuffer {
// 'promises'.
void updateAfterAcknowledgeLocked(
const std::vector<std::shared_ptr<SerializedPage>>& freed,
std::vector<VeloxPromise<bool>>& promises);
std::vector<ContinuePromise>& promises);

/// Given an updated total number of broadcast buffers, add any missing ones
/// and enqueue data that has been produced so far (e.g. dataToBroadcast_).
Expand Down Expand Up @@ -181,7 +181,7 @@ class PartitionedOutputBuffer {
std::mutex mutex_;
// Actual data size in 'buffers_'.
uint64_t totalSize_ = 0;
std::vector<VeloxPromise<bool>> promises_;
std::vector<ContinuePromise> promises_;
// One buffer per destination
std::vector<std::unique_ptr<DestinationBuffer>> buffers_;
uint32_t numFinished_{0};
Expand Down
6 changes: 3 additions & 3 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ bool Task::allPeersFinished(
const core::PlanNodeId& planNodeId,
Driver* caller,
ContinueFuture* future,
std::vector<VeloxPromise<bool>>& promises,
std::vector<ContinuePromise>& promises,
std::vector<std::shared_ptr<Driver>>& peers) {
std::lock_guard<std::mutex> l(mutex_);
if (exception_) {
Expand Down Expand Up @@ -993,8 +993,8 @@ std::string Task::shortId(const std::string& id) {

/// Moves split promises from one vector to another.
static void movePromisesOut(
std::vector<VeloxPromise<bool>>& from,
std::vector<VeloxPromise<bool>>& to) {
std::vector<ContinuePromise>& from,
std::vector<ContinuePromise>& to) {
for (auto& promise : from) {
to.push_back(std::move(promise));
}
Expand Down
8 changes: 3 additions & 5 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ class PartitionedOutputBufferManager;
class HashJoinBridge;
class CrossJoinBridge;

using ContinuePromise = VeloxPromise<bool>;

class Task : public std::enable_shared_from_this<Task> {
public:
/// Creates a task to execute a plan fragment, but doesn't start execution
Expand Down Expand Up @@ -333,7 +331,7 @@ class Task : public std::enable_shared_from_this<Task> {
const core::PlanNodeId& planNodeId,
Driver* FOLLY_NONNULL caller,
ContinueFuture* FOLLY_NONNULL future,
std::vector<VeloxPromise<bool>>& promises,
std::vector<ContinuePromise>& promises,
std::vector<std::shared_ptr<Driver>>& peers);

// Adds HashJoinBridge's for all the specified plan node IDs.
Expand Down Expand Up @@ -654,7 +652,7 @@ class Task : public std::enable_shared_from_this<Task> {
/// Stores separate splits state for each plan node.
std::unordered_map<core::PlanNodeId, SplitsState> splitsStates_;

std::vector<VeloxPromise<bool>> stateChangePromises_;
std::vector<ContinuePromise> stateChangePromises_;

TaskStats taskStats_;
std::unique_ptr<memory::MemoryPool> pool_;
Expand Down Expand Up @@ -686,7 +684,7 @@ class Task : public std::enable_shared_from_this<Task> {
// Promises for the futures returned to callers of requestPause() or
// terminate(). They are fulfilled when the last thread stops
// running for 'this'.
std::vector<VeloxPromise<bool>> threadFinishPromises_;
std::vector<ContinuePromise> threadFinishPromises_;
};

/// Listener invoked on task completion.
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/TaskStructs.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ enum TaskState { kRunning, kFinished, kCanceled, kAborted, kFailed };
struct BarrierState {
int32_t numRequested;
std::vector<std::shared_ptr<Driver>> drivers;
std::vector<VeloxPromise<bool>> promises;
std::vector<ContinuePromise> promises;
};

/// Structure to accumulate splits for distribution.
Expand All @@ -44,7 +44,7 @@ struct SplitsStore {
/// Signal, that no more splits will arrive.
bool noMoreSplits{false};
/// Blocking promises given out when out of splits to distribute.
std::vector<VeloxPromise<bool>> splitPromises;
std::vector<ContinuePromise> splitPromises;
};

/// Structure contains the current info on splits for a particular plan node.
Expand Down
Loading