From 8738f85ee3e2f95f7ea3e30becafb1a8907b73e5 Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Thu, 24 Oct 2024 23:38:27 -0700 Subject: [PATCH] misc: Improve OutputBufferManager initialization Starting from C++11, the C++ standard guarantees that the initialization of function-local static variables is thread-safe. This is better than using a global mutex, especially for subsequent getInstance() calls. This is because the overhead of using a static local variable only needs to do a simple check to see if the variable has already been initialized, while for the global mutex case, all getInstance() calls need to aquire this lock exclusively. --- velox/exec/OutputBufferManager.cpp | 19 ++++++++----------- velox/exec/OutputBufferManager.h | 9 ++------- velox/exec/PartitionedOutput.cpp | 2 +- velox/exec/Task.cpp | 2 +- velox/exec/tests/ExchangeClientTest.cpp | 2 +- velox/exec/tests/GroupedExecutionTest.cpp | 6 +++--- velox/exec/tests/LimitTest.cpp | 2 +- velox/exec/tests/MultiFragmentTest.cpp | 4 ++-- velox/exec/tests/OutputBufferManagerTest.cpp | 2 +- velox/exec/tests/PartitionedOutputTest.cpp | 2 +- velox/exec/tests/TaskTest.cpp | 2 +- .../exec/tests/utils/LocalExchangeSource.cpp | 6 +++--- velox/tool/trace/PartitionedOutputReplayer.h | 2 +- .../tests/PartitionedOutputReplayerTest.cpp | 2 +- 14 files changed, 27 insertions(+), 35 deletions(-) diff --git a/velox/exec/OutputBufferManager.cpp b/velox/exec/OutputBufferManager.cpp index 0425218d175a5..a78a26b257cc3 100644 --- a/velox/exec/OutputBufferManager.cpp +++ b/velox/exec/OutputBufferManager.cpp @@ -17,21 +17,18 @@ #include "velox/exec/Task.h" namespace facebook::velox::exec { + // static -void OutputBufferManager::initialize(const Options& options) { - std::lock_guard l(initMutex_); - VELOX_CHECK( - instance_ == nullptr, "May initialize OutputBufferManager only once"); - instance_ = std::make_shared(options); +std::weak_ptr OutputBufferManager::getInstance() { + return getInstanceRef(); } // static -std::weak_ptr OutputBufferManager::getInstance() { - std::lock_guard l(initMutex_); - if (!instance_) { - instance_ = std::make_shared(Options()); - } - return instance_; +const std::shared_ptr& +OutputBufferManager::getInstanceRef() { + static const std::shared_ptr instance = + std::make_shared(Options()); + return instance; } std::shared_ptr OutputBufferManager::getBuffer( diff --git a/velox/exec/OutputBufferManager.h b/velox/exec/OutputBufferManager.h index 038d42cdce77c..c45c34292848f 100644 --- a/velox/exec/OutputBufferManager.h +++ b/velox/exec/OutputBufferManager.h @@ -94,12 +94,10 @@ class OutputBufferManager { void removeTask(const std::string& taskId); - /// Initializes singleton with 'options'. May be called once before - /// getInstance(). - static void initialize(const Options& options); - static std::weak_ptr getInstance(); + static const std::shared_ptr& getInstanceRef(); + uint64_t numBuffers() const; // Returns a new stream listener if a listener factory has been set. @@ -143,8 +141,5 @@ class OutputBufferManager { std::function()> listenerFactory_{ nullptr}; - - inline static std::shared_ptr instance_; - inline static std::mutex initMutex_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/PartitionedOutput.cpp b/velox/exec/PartitionedOutput.cpp index a08114210245b..4d8a081f716c9 100644 --- a/velox/exec/PartitionedOutput.cpp +++ b/velox/exec/PartitionedOutput.cpp @@ -190,7 +190,7 @@ PartitionedOutput::PartitionedOutput( planNode->inputType(), planNode->outputType(), planNode->outputType())), - bufferManager_(OutputBufferManager::getInstance()), + bufferManager_(OutputBufferManager::getInstanceRef()), // NOTE: 'bufferReleaseFn_' holds a reference on the associated task to // prevent it from deleting while there are output buffers being accessed // out of the partitioned output buffer manager such as in Prestissimo, diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index aa6f0842f5dff..e597b59faf5e1 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -302,7 +302,7 @@ Task::Task( consumerSupplier_(std::move(consumerSupplier)), onError_(std::move(onError)), splitsStates_(buildSplitStates(planFragment_.planNode)), - bufferManager_(OutputBufferManager::getInstance()) { + bufferManager_(OutputBufferManager::getInstanceRef()) { // NOTE: the executor must not be folly::InlineLikeExecutor for parallel // execution. if (mode_ == Task::ExecutionMode::kParallel) { diff --git a/velox/exec/tests/ExchangeClientTest.cpp b/velox/exec/tests/ExchangeClientTest.cpp index 9870bc73ea34f..1d7b131fc35fe 100644 --- a/velox/exec/tests/ExchangeClientTest.cpp +++ b/velox/exec/tests/ExchangeClientTest.cpp @@ -62,7 +62,7 @@ class ExchangeClientTest if (!isRegisteredVectorSerde()) { velox::serializer::presto::PrestoVectorSerde::registerVectorSerde(); } - bufferManager_ = OutputBufferManager::getInstance().lock(); + bufferManager_ = OutputBufferManager::getInstanceRef(); common::testutil::TestValue::enable(); } diff --git a/velox/exec/tests/GroupedExecutionTest.cpp b/velox/exec/tests/GroupedExecutionTest.cpp index 0a7fc85cef71d..eec14a9e96615 100644 --- a/velox/exec/tests/GroupedExecutionTest.cpp +++ b/velox/exec/tests/GroupedExecutionTest.cpp @@ -287,7 +287,7 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithOutputBuffer) { // 'Delete results' from output buffer triggers 'set all output consumed', // which should finish the task. - auto outputBufferManager = exec::OutputBufferManager::getInstance().lock(); + auto outputBufferManager = exec::OutputBufferManager::getInstanceRef(); outputBufferManager->deleteResults(task->taskId(), 0); // Task must be finished at this stage. @@ -471,7 +471,7 @@ DEBUG_ONLY_TEST_F( // 'Delete results' from output buffer triggers 'set all output consumed', // which should finish the task. - auto outputBufferManager = exec::OutputBufferManager::getInstance().lock(); + auto outputBufferManager = exec::OutputBufferManager::getInstanceRef(); outputBufferManager->deleteResults(task->taskId(), 0); // Task must be finished at this stage. @@ -627,7 +627,7 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithHashAndNestedLoopJoin) { // 'Delete results' from output buffer triggers 'set all output consumed', // which should finish the task. - auto outputBufferManager = exec::OutputBufferManager::getInstance().lock(); + auto outputBufferManager = exec::OutputBufferManager::getInstanceRef(); outputBufferManager->deleteResults(task->taskId(), 0); // Task must be finished at this stage. diff --git a/velox/exec/tests/LimitTest.cpp b/velox/exec/tests/LimitTest.cpp index 68c27a5bafdd7..d1be2199c0068 100644 --- a/velox/exec/tests/LimitTest.cpp +++ b/velox/exec/tests/LimitTest.cpp @@ -122,7 +122,7 @@ TEST_F(LimitTest, partialLimitEagerFlush) { params.planNode = builder.partitionedOutput({}, 1).planNode(); auto cursor = TaskCursor::create(params); ASSERT_FALSE(cursor->moveNext()); - auto bufferManager = exec::OutputBufferManager::getInstance().lock(); + auto bufferManager = exec::OutputBufferManager::getInstanceRef(); auto [numPagesPromise, numPagesFuture] = folly::makePromiseContract(); ASSERT_TRUE(bufferManager->getData( cursor->task()->taskId(), diff --git a/velox/exec/tests/MultiFragmentTest.cpp b/velox/exec/tests/MultiFragmentTest.cpp index ba8dea170173d..c6d1287d606fa 100644 --- a/velox/exec/tests/MultiFragmentTest.cpp +++ b/velox/exec/tests/MultiFragmentTest.cpp @@ -209,7 +209,7 @@ class MultiFragmentTest : public HiveConnectorTestBase, std::vector> filePaths_; std::vector vectors_; std::shared_ptr bufferManager_{ - OutputBufferManager::getInstance().lock()}; + OutputBufferManager::getInstanceRef()}; }; TEST_P(MultiFragmentTest, aggregationSingleKey) { @@ -2379,7 +2379,7 @@ class DataFetcher { folly::EventCount bufferFullOrDoneWait_; std::shared_ptr bufferManager_{ - OutputBufferManager::getInstance().lock()}; + OutputBufferManager::getInstanceRef()}; }; /// Verify that POBM::getData() honors maxBytes parameter roughly at 1MB diff --git a/velox/exec/tests/OutputBufferManagerTest.cpp b/velox/exec/tests/OutputBufferManagerTest.cpp index 8ba91d12b6e2f..4825530017bc2 100644 --- a/velox/exec/tests/OutputBufferManagerTest.cpp +++ b/velox/exec/tests/OutputBufferManagerTest.cpp @@ -62,7 +62,7 @@ class OutputBufferManagerTest : public testing::Test { void SetUp() override { pool_ = facebook::velox::memory::memoryManager()->addLeafPool(); - bufferManager_ = OutputBufferManager::getInstance().lock(); + bufferManager_ = OutputBufferManager::getInstanceRef(); if (!isRegisteredVectorSerde()) { facebook::velox::serializer::presto::PrestoVectorSerde:: registerVectorSerde(); diff --git a/velox/exec/tests/PartitionedOutputTest.cpp b/velox/exec/tests/PartitionedOutputTest.cpp index 4ce97e38d02eb..be921e80c9135 100644 --- a/velox/exec/tests/PartitionedOutputTest.cpp +++ b/velox/exec/tests/PartitionedOutputTest.cpp @@ -91,7 +91,7 @@ class PartitionedOutputTest private: const std::shared_ptr bufferManager_{ - OutputBufferManager::getInstance().lock()}; + OutputBufferManager::getInstanceRef()}; }; TEST_P(PartitionedOutputTest, flush) { diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index 58a1293859b5b..d12d7e474351a 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -1045,7 +1045,7 @@ TEST_F(TaskTest, updateBroadCastOutputBuffers) { .project({"c0 % 10"}) .partitionedOutputBroadcast({}) .planFragment(); - auto bufferManager = OutputBufferManager::getInstance().lock(); + auto bufferManager = OutputBufferManager::getInstanceRef(); { auto task = Task::create( "t0", diff --git a/velox/exec/tests/utils/LocalExchangeSource.cpp b/velox/exec/tests/utils/LocalExchangeSource.cpp index 9d012c908c95e..3bb4979079514 100644 --- a/velox/exec/tests/utils/LocalExchangeSource.cpp +++ b/velox/exec/tests/utils/LocalExchangeSource.cpp @@ -52,7 +52,7 @@ class LocalExchangeSource : public exec::ExchangeSource { promise_ = std::move(promise); - auto buffers = OutputBufferManager::getInstance().lock(); + auto buffers = OutputBufferManager::getInstanceRef(); VELOX_CHECK_NOT_NULL(buffers, "invalid OutputBufferManager"); VELOX_CHECK(requestPending_); auto requestedSequence = sequence_; @@ -164,7 +164,7 @@ class LocalExchangeSource : public exec::ExchangeSource { void pause() override { common::testutil::TestValue::adjust( "facebook::velox::exec::test::LocalExchangeSource::pause", nullptr); - auto buffers = OutputBufferManager::getInstance().lock(); + auto buffers = OutputBufferManager::getInstanceRef(); VELOX_CHECK_NOT_NULL(buffers, "invalid OutputBufferManager"); int64_t ackSequence; { @@ -177,7 +177,7 @@ class LocalExchangeSource : public exec::ExchangeSource { void close() override { checkSetRequestPromise(); - auto buffers = OutputBufferManager::getInstance().lock(); + auto buffers = OutputBufferManager::getInstanceRef(); buffers->deleteResults(taskId_, destination_); } diff --git a/velox/tool/trace/PartitionedOutputReplayer.h b/velox/tool/trace/PartitionedOutputReplayer.h index f9eb0c1cd9edd..99a450bc3a907 100644 --- a/velox/tool/trace/PartitionedOutputReplayer.h +++ b/velox/tool/trace/PartitionedOutputReplayer.h @@ -63,7 +63,7 @@ class PartitionedOutputReplayer final : public OperatorReplayerBase { const core::PartitionedOutputNode* const originalNode_; const VectorSerde::Kind serdeKind_; const std::shared_ptr bufferManager_{ - exec::OutputBufferManager::getInstance().lock()}; + exec::OutputBufferManager::getInstanceRef()}; const std::unique_ptr executor_{ std::make_unique( std::thread::hardware_concurrency(), diff --git a/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp b/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp index 46e2ddea5b35a..c15e8136389de 100644 --- a/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp +++ b/velox/tool/trace/tests/PartitionedOutputReplayerTest.cpp @@ -120,7 +120,7 @@ class PartitionedOutputReplayerTest } const std::shared_ptr bufferManager_{ - exec::OutputBufferManager::getInstance().lock()}; + exec::OutputBufferManager::getInstanceRef()}; }; TEST_P(PartitionedOutputReplayerTest, defaultConsumer) {