From 670ae5b10aaac3041dc5f493a8add721524784d6 Mon Sep 17 00:00:00 2001 From: tanjialiang Date: Wed, 16 Nov 2022 14:36:06 -0800 Subject: [PATCH] [native] Shuffle related entities should have shared ownership to ShuffleInterface --- .../main/operators/ShuffleWrite.cpp | 2 +- .../presto_cpp/main/operators/ShuffleWrite.h | 6 ++--- .../main/operators/UnsafeRowExchangeSource.h | 6 ++--- .../operators/tests/UnsaferowShuffleTest.cpp | 23 ++++++++++--------- 4 files changed, 19 insertions(+), 18 deletions(-) diff --git a/presto-native-execution/presto_cpp/main/operators/ShuffleWrite.cpp b/presto-native-execution/presto_cpp/main/operators/ShuffleWrite.cpp index b82b504d15188..79d639687e686 100644 --- a/presto-native-execution/presto_cpp/main/operators/ShuffleWrite.cpp +++ b/presto-native-execution/presto_cpp/main/operators/ShuffleWrite.cpp @@ -64,7 +64,7 @@ class ShuffleWriteOperator : public Operator { } private: - ShuffleInterface* const FOLLY_NONNULL shuffle_; + const std::shared_ptr shuffle_; }; } // namespace diff --git a/presto-native-execution/presto_cpp/main/operators/ShuffleWrite.h b/presto-native-execution/presto_cpp/main/operators/ShuffleWrite.h index 9775075ea8212..2a35272e8289c 100644 --- a/presto-native-execution/presto_cpp/main/operators/ShuffleWrite.h +++ b/presto-native-execution/presto_cpp/main/operators/ShuffleWrite.h @@ -23,7 +23,7 @@ class ShuffleWriteNode : public velox::core::PlanNode { public: ShuffleWriteNode( const velox::core::PlanNodeId& id, - ShuffleInterface* shuffle, + const std::shared_ptr& shuffle, velox::core::PlanNodePtr source) : velox::core::PlanNode(id), shuffle_{shuffle}, @@ -37,7 +37,7 @@ class ShuffleWriteNode : public velox::core::PlanNode { return sources_; } - ShuffleInterface* shuffle() const { + const std::shared_ptr& shuffle() const { return shuffle_; } @@ -48,7 +48,7 @@ class ShuffleWriteNode : public velox::core::PlanNode { private: void addDetails(std::stringstream& stream) const override {} - ShuffleInterface* shuffle_; + const std::shared_ptr shuffle_; const std::vector sources_; }; diff --git a/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.h b/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.h index 405fcf2c5d159..eab9410b05706 100644 --- a/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.h +++ b/presto-native-execution/presto_cpp/main/operators/UnsafeRowExchangeSource.h @@ -26,7 +26,7 @@ class UnsafeRowExchangeSource : public velox::exec::ExchangeSource { const std::string& taskId, int destination, std::shared_ptr queue, - ShuffleInterface* shuffle, + const std::shared_ptr& shuffle, velox::memory::MemoryPool* pool) : ExchangeSource(taskId, destination, queue, pool), shuffle_(shuffle) {} @@ -39,6 +39,6 @@ class UnsafeRowExchangeSource : public velox::exec::ExchangeSource { void close() override {} private: - ShuffleInterface* shuffle_; + const std::shared_ptr shuffle_; }; -} // namespace facebook::presto::operators \ No newline at end of file +} // namespace facebook::presto::operators diff --git a/presto-native-execution/presto_cpp/main/operators/tests/UnsaferowShuffleTest.cpp b/presto-native-execution/presto_cpp/main/operators/tests/UnsaferowShuffleTest.cpp index 4cdbd4aee3c55..61f8a51c680cf 100644 --- a/presto-native-execution/presto_cpp/main/operators/tests/UnsaferowShuffleTest.cpp +++ b/presto-native-execution/presto_cpp/main/operators/tests/UnsaferowShuffleTest.cpp @@ -128,7 +128,7 @@ class TestShuffle : public ShuffleInterface { std::vector> readyPartitions_; }; -void registerExchangeSource(ShuffleInterface* shuffle) { +void registerExchangeSource(const std::shared_ptr& shuffle) { exec::ExchangeSource::registerFactory( [shuffle]( const std::string& taskId, @@ -166,7 +166,7 @@ auto addPartitionAndSerializeNode(uint32_t numPartitions) { }; } -auto addShuffleWriteNode(ShuffleInterface* shuffle) { +auto addShuffleWriteNode(const std::shared_ptr& shuffle) { return [shuffle]( core::PlanNodeId nodeId, core::PlanNodePtr source) -> core::PlanNodePtr { @@ -267,7 +267,7 @@ class UnsafeRowShuffleTest : public exec::test::OperatorTestBase { } void runShuffleTest( - ShuffleInterface* shuffle, + const std::shared_ptr& shuffle, size_t numPartitions, size_t numMapDrivers, const std::vector& data) { @@ -348,7 +348,7 @@ TEST_F(UnsafeRowShuffleTest, operators) { std::make_unique()); exec::Operator::registerOperator(std::make_unique()); - TestShuffle shuffle(pool(), 4, 1 << 20 /* 1MB */); + auto shuffle = std::make_shared(pool(), 4, 1 << 20 /* 1MB */); auto data = makeRowVector({ makeFlatVector({1, 2, 3, 4}), @@ -359,7 +359,7 @@ TEST_F(UnsafeRowShuffleTest, operators) { .values({data}, true) .addNode(addPartitionAndSerializeNode(4)) .localPartition({}) - .addNode(addShuffleWriteNode(&shuffle)) + .addNode(addShuffleWriteNode(shuffle)) .planNode(); exec::test::CursorParameters params; @@ -375,12 +375,13 @@ TEST_F(UnsafeRowShuffleTest, endToEnd) { size_t numPartitions = 5; size_t numMapDrivers = 2; - TestShuffle shuffle(pool(), numPartitions, 1 << 20 /* 1MB */); + auto shuffle = + std::make_shared(pool(), numPartitions, 1 << 20 /* 1MB */); auto data = vectorMaker_.rowVector({ makeFlatVector({1, 2, 3, 4, 5, 6}), makeFlatVector({10, 20, 30, 40, 50, 60}), }); - runShuffleTest(&shuffle, numPartitions, numMapDrivers, {data}); + runShuffleTest(shuffle, numPartitions, numMapDrivers, {data}); } TEST_F(UnsafeRowShuffleTest, persistentShuffleDeser) { @@ -432,14 +433,14 @@ TEST_F(UnsafeRowShuffleTest, persistentShuffle) { auto rootPath = rootDirectory->path; // Initialize persistent shuffle. - LocalPersistentShuffle shuffle( + auto shuffle = std::make_shared( rootPath, pool(), numPartitions, 1 << 20 /* 1MB */); auto data = vectorMaker_.rowVector({ makeFlatVector({1, 2, 3, 4, 5, 6}), makeFlatVector({10, 20, 30, 40, 50, 60}), }); - runShuffleTest(&shuffle, numPartitions, numMapDrivers, {data}); + runShuffleTest(shuffle, numPartitions, numMapDrivers, {data}); } TEST_F(UnsafeRowShuffleTest, persistentShuffleFuzz) { @@ -492,7 +493,7 @@ TEST_F(UnsafeRowShuffleTest, persistentShuffleFuzz) { velox::filesystems::registerLocalFileSystem(); auto rootDirectory = velox::exec::test::TempDirectoryPath::create(); auto rootPath = rootDirectory->path; - auto shuffle = std::make_unique( + auto shuffle = std::make_shared( rootPath, pool(), numPartitions, 1 << 15); for (int it = 0; it < numIterations; it++) { shuffle->reset(pool(), numPartitions, rootPath); @@ -505,7 +506,7 @@ TEST_F(UnsafeRowShuffleTest, persistentShuffleFuzz) { auto input = fuzzer.fuzzRow(rowType); inputVectors.push_back(input); } - runShuffleTest(shuffle.get(), numPartitions, numMapDrivers, inputVectors); + runShuffleTest(shuffle, numPartitions, numMapDrivers, inputVectors); } }