From 14b56d8f77adeb849948c117efec332c19c79b1a Mon Sep 17 00:00:00 2001 From: Xiaoxuan Meng Date: Sat, 9 Nov 2024 22:12:07 -0800 Subject: [PATCH] [native]support different vector serde in shuffle Advance velox version and update Prestissimo code to support configurable shuffle vector serde format Fix timing related flakiness in PrestoExchangeSourceTest.slowProducerAndEarlyTerminatingConsumer --- .../presto_cpp/main/PrestoServer.cpp | 11 ++ .../presto_cpp/main/operators/ShuffleRead.cpp | 5 +- .../presto_cpp/main/operators/ShuffleRead.h | 2 +- .../main/operators/tests/BroadcastTest.cpp | 17 +- .../main/tests/PrestoExchangeSourceTest.cpp | 6 +- .../presto_cpp/main/tests/TaskManagerTest.cpp | 176 ++++++++++-------- .../main/types/PrestoToVeloxQueryPlan.cpp | 44 ++++- presto-native-execution/velox | 2 +- 8 files changed, 168 insertions(+), 95 deletions(-) diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index 8c6e2d7a5a46a..f4c658fbec838 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -65,7 +65,9 @@ #include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h" #include "velox/functions/prestosql/registration/RegistrationFunctions.h" #include "velox/functions/prestosql/window/WindowFunctionsRegistration.h" +#include "velox/serializers/CompactRowSerializer.h" #include "velox/serializers/PrestoSerializer.h" +#include "velox/serializers/UnsafeRowSerializer.h" #ifdef PRESTO_ENABLE_REMOTE_FUNCTIONS #include "presto_cpp/main/RemoteFunctionRegisterer.h" @@ -1275,6 +1277,15 @@ void PrestoServer::registerVectorSerdes() { if (!velox::isRegisteredVectorSerde()) { velox::serializer::presto::PrestoVectorSerde::registerVectorSerde(); } + if (!isRegisteredNamedVectorSerde(VectorSerde::Kind::kPresto)) { + velox::serializer::presto::PrestoVectorSerde::registerNamedVectorSerde(); + } + if (!isRegisteredNamedVectorSerde(VectorSerde::Kind::kCompactRow)) { + velox::serializer::CompactRowVectorSerde::registerNamedVectorSerde(); + } + if (!isRegisteredNamedVectorSerde(VectorSerde::Kind::kUnsafeRow)) { + velox::serializer::spark::UnsafeRowVectorSerde::registerNamedVectorSerde(); + } } void PrestoServer::registerFileSinks() { diff --git a/presto-native-execution/presto_cpp/main/operators/ShuffleRead.cpp b/presto-native-execution/presto_cpp/main/operators/ShuffleRead.cpp index 9032b49d7f322..2c8d8fb1dd3c2 100644 --- a/presto-native-execution/presto_cpp/main/operators/ShuffleRead.cpp +++ b/presto-native-execution/presto_cpp/main/operators/ShuffleRead.cpp @@ -28,7 +28,7 @@ class ShuffleReadOperator : public Exchange { public: ShuffleReadOperator( int32_t operatorId, - DriverCtx* FOLLY_NONNULL ctx, + DriverCtx* ctx, const std::shared_ptr& shuffleReadNode, std::shared_ptr exchangeClient) : Exchange( @@ -36,7 +36,8 @@ class ShuffleReadOperator : public Exchange { ctx, std::make_shared( shuffleReadNode->id(), - shuffleReadNode->outputType()), + shuffleReadNode->outputType(), + velox::VectorSerde::Kind::kCompactRow), exchangeClient, "ShuffleRead"), serde_(std::make_unique()) {} diff --git a/presto-native-execution/presto_cpp/main/operators/ShuffleRead.h b/presto-native-execution/presto_cpp/main/operators/ShuffleRead.h index 059dca9e09e4d..43dd288b89322 100644 --- a/presto-native-execution/presto_cpp/main/operators/ShuffleRead.h +++ b/presto-native-execution/presto_cpp/main/operators/ShuffleRead.h @@ -54,7 +54,7 @@ class ShuffleReadNode : public velox::core::PlanNode { // Nothing to add } - velox::RowTypePtr outputType_; + const velox::RowTypePtr outputType_; }; class ShuffleReadTranslator : public velox::exec::Operator::PlanNodeTranslator { diff --git a/presto-native-execution/presto_cpp/main/operators/tests/BroadcastTest.cpp b/presto-native-execution/presto_cpp/main/operators/tests/BroadcastTest.cpp index f9cb9d94a8bf6..0973a8359c0b2 100644 --- a/presto-native-execution/presto_cpp/main/operators/tests/BroadcastTest.cpp +++ b/presto-native-execution/presto_cpp/main/operators/tests/BroadcastTest.cpp @@ -13,8 +13,6 @@ */ #include #include -#include "folly/init/Init.h" -#include "presto_cpp/external/json/nlohmann/json.hpp" #include "presto_cpp/main/operators/BroadcastExchangeSource.h" #include "presto_cpp/main/operators/BroadcastWrite.h" #include "presto_cpp/main/operators/tests/PlanBuilder.h" @@ -106,7 +104,9 @@ class BroadcastTest : public exec::test::OperatorTestBase { const std::string& basePath, const std::vector& broadcastFilePaths) { // Create plan for read node using file path. - auto readerPlan = exec::test::PlanBuilder().exchange(dataType).planNode(); + auto readerPlan = exec::test::PlanBuilder() + .exchange(dataType, velox::VectorSerde::Kind::kPresto) + .planNode(); exec::test::CursorParameters broadcastReadParams; broadcastReadParams.planNode = readerPlan; @@ -211,7 +211,12 @@ class BroadcastTest : public exec::test::OperatorTestBase { auto byteStream = std::make_unique(std::move(ranges)); RowVectorPtr result; - VectorStreamGroup::read(byteStream.get(), pool(), dataType, &result); + VectorStreamGroup::read( + byteStream.get(), + pool(), + dataType, + velox::getNamedVectorSerde(velox::VectorSerde::Kind::kPresto), + &result); return result; } }; @@ -352,7 +357,9 @@ TEST_F(BroadcastTest, malformedBroadcastInfoJson) { std::string basePath = "/tmp"; std::string invalidBroadcastFilePath = "/tmp/file.bin"; - auto readerPlan = exec::test::PlanBuilder().exchange(dataType).planNode(); + auto readerPlan = exec::test::PlanBuilder() + .exchange(dataType, velox::VectorSerde::Kind::kPresto) + .planNode(); exec::test::CursorParameters broadcastReadParams; broadcastReadParams.planNode = readerPlan; diff --git a/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp b/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp index e4455b10557fd..2ff251bba5eee 100644 --- a/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp @@ -696,6 +696,7 @@ DEBUG_ONLY_TEST_P( std::function( ([&](const auto* prestoExchangeSource) { allCloseCheckPassed = true; + closeWait.notifyAll(); }))); SCOPED_TESTVALUE_SET( "facebook::presto::PrestoExchangeSource::handleDataResponse", @@ -739,8 +740,9 @@ DEBUG_ONLY_TEST_P( // all resources have been cleaned up, so explicitly waiting is the only way // to allow the execution of background processing. We expect the test to not // crash. - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - EXPECT_TRUE(codePointHit); + while (!codePointHit) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } serverWrapper.stop(); } diff --git a/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp b/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp index acc741051f390..4dbfff1cd0911 100644 --- a/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp @@ -113,11 +113,13 @@ class Cursor { TaskManager* taskManager, const protocol::TaskId& taskId, const RowTypePtr& rowType, + velox::VectorSerde::Kind serdeKind, memory::MemoryPool* pool) : pool_(pool), taskManager_(taskManager), taskId_(taskId), - rowType_(rowType) {} + rowType_(rowType), + serdeKind_(serdeKind) {} std::optional> next() { if (atEnd_) { @@ -157,23 +159,25 @@ class Cursor { const_cast(range.data()), (int32_t)range.size(), 0}); } - auto input = std::make_unique(std::move(byteRanges)); - + const auto input = + std::make_unique(std::move(byteRanges)); + auto* serde = velox::getNamedVectorSerde(serdeKind_); std::vector vectors; while (!input->atEnd()) { RowVectorPtr vector; - VectorStreamGroup::read(input.get(), pool_, rowType_, &vector); + VectorStreamGroup::read(input.get(), pool_, rowType_, serde, &vector); vectors.emplace_back(vector); } return vectors; } - memory::MemoryPool* pool_; - TaskManager* taskManager_; + memory::MemoryPool* const pool_; + TaskManager* const taskManager_; const protocol::TaskId taskId_; - RowTypePtr rowType_; - bool atEnd_ = false; - uint64_t sequence_ = 0; + const RowTypePtr rowType_; + const velox::VectorSerde::Kind serdeKind_; + bool atEnd_{false}; + uint64_t sequence_{0}; }; void setAggregationSpillConfig( @@ -182,8 +186,17 @@ void setAggregationSpillConfig( queryConfigs.emplace(core::QueryConfig::kAggregationSpillEnabled, "true"); } -class TaskManagerTest : public exec::test::OperatorTestBase { +class TaskManagerTest : public exec::test::OperatorTestBase, + public testing::WithParamInterface { public: + static std::vector getTestParams() { + const std::vector kinds( + {VectorSerde::Kind::kPresto, + VectorSerde::Kind::kCompactRow, + VectorSerde::Kind::kUnsafeRow}); + return kinds; + } + static void SetUpTestCase() { OperatorTestBase::SetUpTestCase(); filesystems::registerLocalFileSystem(); @@ -376,7 +389,8 @@ class TaskManagerTest : public exec::test::OperatorTestBase { const protocol::TaskId& taskId, const RowTypePtr& resultType, const std::vector& allTaskIds) { - Cursor cursor(taskManager_.get(), taskId, resultType, pool_.get()); + Cursor cursor( + taskManager_.get(), taskId, resultType, GetParam(), pool_.get()); std::vector vectors; for (;;) { auto moreVectors = cursor.next(); @@ -416,10 +430,11 @@ class TaskManagerTest : public exec::test::OperatorTestBase { const RowTypePtr& outputType, long& splitSequenceId, protocol::TaskId outputTaskId = "output.0.0.1.0") { - auto planFragment = exec::test::PlanBuilder() - .exchange(outputType) - .partitionedOutput({}, 1) - .planFragment(); + auto planFragment = + exec::test::PlanBuilder() + .exchange(outputType, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .planFragment(); protocol::TaskUpdateRequest updateRequest; updateRequest.sources.push_back( @@ -497,7 +512,7 @@ class TaskManagerTest : public exec::test::OperatorTestBase { .tableScan(rowType_) .project({"c0 % 5"}) .partialAggregation({"p0"}, {"count(1)"}) - .partitionedOutput({"p0"}, numPartitions, {"p0", "a0"}) + .partitionedOutput({"p0"}, numPartitions, {"p0", "a0"}, GetParam()) .planFragment(); TaskIdGenerator taskIdGenerator(queryId); @@ -526,10 +541,12 @@ class TaskManagerTest : public exec::test::OperatorTestBase { .localPartition( {"p0"}, {exec::test::PlanBuilder(planNodeIdGenerator) - .exchange(partialAggPlanFragment.planNode->outputType()) + .exchange( + partialAggPlanFragment.planNode->outputType(), + GetParam()) .planNode()}) .finalAggregation({"p0"}, {"count(a0)"}, {{BIGINT()}}) - .partitionedOutput({}, 1, {"p0", "a0"}) + .partitionedOutput({}, 1, {"p0", "a0"}, GetParam()) .planFragment(); std::vector finalAggTasks; @@ -685,7 +702,7 @@ class TaskManagerTest : public exec::test::OperatorTestBase { // Runs "select * from t where c0 % 5 = 0" query. // Creates one task and provides all splits at once. -TEST_F(TaskManagerTest, tableScanAllSplitsAtOnce) { +TEST_P(TaskManagerTest, tableScanAllSplitsAtOnce) { const auto tableDir = exec::test::TempDirectoryPath::create(); auto filePaths = makeFilePaths(tableDir, 5); auto vectors = makeVectors(filePaths.size(), 1'000); @@ -697,7 +714,7 @@ TEST_F(TaskManagerTest, tableScanAllSplitsAtOnce) { auto planFragment = exec::test::PlanBuilder() .tableScan(rowType_) .filter("c0 % 5 = 0") - .partitionedOutput({}, 1, {"c0", "c1"}) + .partitionedOutput({}, 1, {"c0", "c1"}, GetParam()) .planFragment(); long splitSequenceId{0}; @@ -712,7 +729,7 @@ TEST_F(TaskManagerTest, tableScanAllSplitsAtOnce) { assertResults(taskId, rowType_, "SELECT * FROM tmp WHERE c0 % 5 = 0"); } -TEST_F(TaskManagerTest, fecthFromFinishedTask) { +TEST_P(TaskManagerTest, fecthFromFinishedTask) { const auto tableDir = exec::test::TempDirectoryPath::create(); auto filePaths = makeFilePaths(tableDir, 5); auto vectors = makeVectors(filePaths.size(), 1'000); @@ -724,7 +741,7 @@ TEST_F(TaskManagerTest, fecthFromFinishedTask) { auto planFragment = exec::test::PlanBuilder() .tableScan(rowType_) .filter("c0 % 5 = 0") - .partitionedOutputArbitrary({"c0", "c1"}) + .partitionedOutputArbitrary({"c0", "c1"}, GetParam()) .planFragment(); const protocol::TaskId taskId = "scan.0.0.1.0"; @@ -763,7 +780,7 @@ TEST_F(TaskManagerTest, fecthFromFinishedTask) { ASSERT_TRUE(newResult.value()->complete); } -DEBUG_ONLY_TEST_F(TaskManagerTest, fecthFromArbitraryOutput) { +DEBUG_ONLY_TEST_P(TaskManagerTest, fecthFromArbitraryOutput) { // Block output until the first fetch destination becomes inactive. folly::EventCount outputWait; std::atomic outputWaitFlag{false}; @@ -777,7 +794,7 @@ DEBUG_ONLY_TEST_F(TaskManagerTest, fecthFromArbitraryOutput) { const std::vector batches = makeVectors(1, 1'000); auto planFragment = exec::test::PlanBuilder() .values(batches) - .partitionedOutputArbitrary({"c0", "c1"}) + .partitionedOutputArbitrary({"c0", "c1"}, GetParam()) .planFragment(); const protocol::TaskId taskId = "source.0.0.1.0"; const auto taskInfo = createOrUpdateTask(taskId, {}, planFragment); @@ -827,7 +844,7 @@ DEBUG_ONLY_TEST_F(TaskManagerTest, fecthFromArbitraryOutput) { prestoTask->task.get(), TaskState::kFinished, 3'000'000)); } -TEST_F(TaskManagerTest, taskCleanupWithPendingResultData) { +TEST_P(TaskManagerTest, taskCleanupWithPendingResultData) { // Trigger old task cleanup immediately. taskManager_->setOldTaskCleanUpMs(0); @@ -841,7 +858,7 @@ TEST_F(TaskManagerTest, taskCleanupWithPendingResultData) { auto planFragment = exec::test::PlanBuilder() .tableScan(rowType_) .filter("c0 % 5 = 0") - .partitionedOutput({}, 1, {"c0", "c1"}) + .partitionedOutput({}, 1, {"c0", "c1"}, GetParam()) .planFragment(); long splitSequenceId{0}; @@ -886,7 +903,7 @@ TEST_F(TaskManagerTest, taskCleanupWithPendingResultData) { // Runs "select * from t where c0 % 5 = 1" query. // Creates one task and provides splits one at a time. -TEST_F(TaskManagerTest, tableScanOneSplitAtATime) { +TEST_P(TaskManagerTest, tableScanOneSplitAtATime) { const auto tableDir = exec::test::TempDirectoryPath::create(); auto filePaths = makeFilePaths(tableDir, 5); auto vectors = makeVectors(filePaths.size(), 1'000); @@ -898,7 +915,7 @@ TEST_F(TaskManagerTest, tableScanOneSplitAtATime) { auto planFragment = exec::test::PlanBuilder() .tableScan(rowType_) .filter("c0 % 5 = 1") - .partitionedOutput({}, 1, {"c0", "c1"}) + .partitionedOutput({}, 1, {"c0", "c1"}, GetParam()) .planFragment(); protocol::TaskId taskId = "scan.0.0.1.0"; @@ -921,7 +938,7 @@ TEST_F(TaskManagerTest, tableScanOneSplitAtATime) { } // Runs 2-stage tableScan: (1) multiple table scan tasks; (2) single output task -TEST_F(TaskManagerTest, tableScanMultipleTasks) { +TEST_P(TaskManagerTest, tableScanMultipleTasks) { const auto tableDir = exec::test::TempDirectoryPath::create(); auto filePaths = makeFilePaths(tableDir, 5); auto vectors = makeVectors(filePaths.size(), 1'000); @@ -933,7 +950,7 @@ TEST_F(TaskManagerTest, tableScanMultipleTasks) { auto planFragment = exec::test::PlanBuilder() .tableScan(rowType_) .filter("c0 % 5 = 1") - .partitionedOutput({}, 1, {"c0", "c1"}) + .partitionedOutput({}, 1, {"c0", "c1"}, GetParam()) .planFragment(); TaskIdGenerator taskIdGenerator("scan"); @@ -957,7 +974,7 @@ TEST_F(TaskManagerTest, tableScanMultipleTasks) { // Create a task to scan an empty (invalid) ORC file. Ensure that the error // propagates via getTaskStatus(). -TEST_F(TaskManagerTest, emptyFile) { +TEST_P(TaskManagerTest, emptyFile) { const auto tableDir = exec::test::TempDirectoryPath::create(); auto filePaths = makeEmptyFiles(tableDir, 1); auto planFragment = exec::test::PlanBuilder() @@ -994,7 +1011,7 @@ TEST_F(TaskManagerTest, emptyFile) { } } -TEST_F(TaskManagerTest, countAggregation) { +TEST_P(TaskManagerTest, countAggregation) { const auto tableDir = exec::test::TempDirectoryPath::create(); auto filePaths = makeFilePaths(tableDir, 5); auto vectors = makeVectors(filePaths.size(), 1'000); @@ -1008,7 +1025,7 @@ TEST_F(TaskManagerTest, countAggregation) { // Run distributed sort query that has 2 stages. First stage runs multiple // tasks with partial sort. Second stage runs single task with merge exchange. -TEST_F(TaskManagerTest, distributedSort) { +TEST_P(TaskManagerTest, distributedSort) { const auto tableDir = exec::test::TempDirectoryPath::create(); auto filePaths = makeFilePaths(tableDir, 5); auto vectors = makeVectors(filePaths.size(), 1'000); @@ -1018,11 +1035,12 @@ TEST_F(TaskManagerTest, distributedSort) { duckDbQueryRunner_.createTable("tmp", vectors); // Create partial sort tasks. - auto partialSortPlan = exec::test::PlanBuilder() - .tableScan(rowType_) - .orderBy({"c0"}, true) - .partitionedOutput({}, 1) - .planFragment(); + auto partialSortPlan = + exec::test::PlanBuilder() + .tableScan(rowType_) + .orderBy({"c0"}, true) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .planFragment(); TaskIdGenerator taskIdGenerator("distributed-sort"); @@ -1040,10 +1058,11 @@ TEST_F(TaskManagerTest, distributedSort) { } // Create final sort task. - auto finalSortPlan = exec::test::PlanBuilder() - .mergeExchange(rowType_, {"c0"}) - .partitionedOutput({}, 1) - .planFragment(); + auto finalSortPlan = + exec::test::PlanBuilder() + .mergeExchange(rowType_, {"c0"}, GetParam()) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .planFragment(); protocol::TaskUpdateRequest updateRequest; updateRequest.sources.push_back( @@ -1081,7 +1100,7 @@ TEST_F(TaskManagerTest, distributedSort) { finalPrestoTaskStats.peakUserMemoryInBytes); } -TEST_F(TaskManagerTest, outOfQueryUserMemory) { +TEST_P(TaskManagerTest, outOfQueryUserMemory) { const auto tableDir = exec::test::TempDirectoryPath::create(); auto filePaths = makeFilePaths(tableDir, 5); auto vectors = makeVectors(filePaths.size(), 1'000); @@ -1115,7 +1134,7 @@ TEST_F(TaskManagerTest, outOfQueryUserMemory) { } // Tests whether the returned futures timeout. -TEST_F(TaskManagerTest, outOfOrderRequests) { +TEST_P(TaskManagerTest, outOfOrderRequests) { auto eventBase = folly::EventBaseManager::get()->getEventBase(); // 5 minute wait to ensure we don't timeout the out-of-order requests. auto longWait = protocol::Duration("300s"); @@ -1129,7 +1148,7 @@ TEST_F(TaskManagerTest, outOfOrderRequests) { auto planFragment = exec::test::PlanBuilder() .values(vectors) .filter("c0 % 5 = 0") - .partitionedOutput({}, 1, {"c0", "c1"}) + .partitionedOutput({}, 1, {"c0", "c1"}, GetParam()) .planFragment(); protocol::TaskId taskId = "scan.0.0.1.0"; @@ -1159,7 +1178,7 @@ TEST_F(TaskManagerTest, outOfOrderRequests) { } // Tests whether the returned futures timeout. -TEST_F(TaskManagerTest, timeoutOutOfOrderRequests) { +TEST_P(TaskManagerTest, timeoutOutOfOrderRequests) { auto eventBase = folly::EventBaseManager::get()->getEventBase(); auto shortWait = protocol::Duration("100ms"); auto longWait = std::chrono::seconds(5); @@ -1197,7 +1216,7 @@ TEST_F(TaskManagerTest, timeoutOutOfOrderRequests) { .getVia(eventBase)); } -TEST_F(TaskManagerTest, aggregationSpill) { +TEST_P(TaskManagerTest, aggregationSpill) { // NOTE: we need to write more than one batches to each file (source split) to // trigger spill. const int numBatchesPerFile = 5; @@ -1242,7 +1261,7 @@ TEST_F(TaskManagerTest, aggregationSpill) { } } -TEST_F(TaskManagerTest, buildTaskSpillDirectoryPath) { +TEST_P(TaskManagerTest, buildTaskSpillDirectoryPath) { EXPECT_EQ( "fs::/base/presto_native/192.168.10.2_19/2022-12-20/20221220-Q/Task1/", TaskManager::buildTaskSpillDirectoryPath( @@ -1271,7 +1290,7 @@ TEST_F(TaskManagerTest, buildTaskSpillDirectoryPath) { false)); } -TEST_F(TaskManagerTest, getDataOnAbortedTask) { +TEST_P(TaskManagerTest, getDataOnAbortedTask) { // Simulate scenario where Driver encountered a VeloxException and terminated // a task, which removes the entry in BufferManager. The main taskmanager // tries to process the resultRequest and calls getData() which must return @@ -1279,7 +1298,7 @@ TEST_F(TaskManagerTest, getDataOnAbortedTask) { auto planFragment = exec::test::PlanBuilder() .tableScan(rowType_) .filter("c0 % 5 = 0") - .partitionedOutput({}, 1, {"c0", "c1"}) + .partitionedOutput({}, 1, {"c0", "c1"}, GetParam()) .planFragment(); int token = 123; @@ -1319,7 +1338,7 @@ TEST_F(TaskManagerTest, getDataOnAbortedTask) { ASSERT_TRUE(promiseFulfilled); } -TEST_F(TaskManagerTest, getResultsFromFailedTask) { +TEST_P(TaskManagerTest, getResultsFromFailedTask) { const protocol::TaskId taskId = "error-task.0.0.0.0"; std::exception e; taskManager_->createOrUpdateErrorTask(taskId, std::make_exception_ptr(e), 0); @@ -1349,7 +1368,7 @@ TEST_F(TaskManagerTest, getResultsFromFailedTask) { ASSERT_EQ(results->data->capacity(), 0); } -TEST_F(TaskManagerTest, getResultsFromAbortedTask) { +TEST_P(TaskManagerTest, getResultsFromAbortedTask) { const protocol::TaskId taskId = "aborted-task.0.0.0.0"; // deleting a non existing task creates an aborted task taskManager_->deleteTask(taskId, true); @@ -1379,12 +1398,13 @@ TEST_F(TaskManagerTest, getResultsFromAbortedTask) { ASSERT_EQ(results->data->capacity(), 0); } -TEST_F(TaskManagerTest, testCumulativeMemory) { +TEST_P(TaskManagerTest, testCumulativeMemory) { const std::vector batches = makeVectors(4, 128); - const auto planFragment = exec::test::PlanBuilder() - .values(batches) - .partitionedOutput({}, 1) - .planFragment(); + const auto planFragment = + exec::test::PlanBuilder() + .values(batches) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .planFragment(); auto queryCtx = core::QueryCtx::create(driverExecutor_.get()); const protocol::TaskId taskId = "scan.0.0.1.0"; auto veloxTask = Task::create( @@ -1467,28 +1487,29 @@ TEST_F(TaskManagerTest, testCumulativeMemory) { velox::exec::test::waitForAllTasksToBeDeleted(3'000'000); } -TEST_F(TaskManagerTest, checkBatchSplits) { +TEST_P(TaskManagerTest, checkBatchSplits) { const auto taskId = "test.1.2.3.4"; core::PlanNodeId probeId; core::PlanNodeId buildId; auto planNodeIdGenerator = std::make_shared(); - auto planFragment = exec::test::PlanBuilder(planNodeIdGenerator) - .tableScan(rowType_) - .capturePlanNodeId(probeId) - .hashJoin( - {"c0"}, - {"u_c0"}, - exec::test::PlanBuilder(planNodeIdGenerator) - .tableScan(rowType_) - .capturePlanNodeId(buildId) - .project({"c0 as u_c0", "c1 as u_c1"}) - .planNode(), - "", - {"u_c0", "u_c1"}) - .singleAggregation({}, {"count(1)"}) - .partitionedOutput({}, 1) - .planFragment(); + auto planFragment = + exec::test::PlanBuilder(planNodeIdGenerator) + .tableScan(rowType_) + .capturePlanNodeId(probeId) + .hashJoin( + {"c0"}, + {"u_c0"}, + exec::test::PlanBuilder(planNodeIdGenerator) + .tableScan(rowType_) + .capturePlanNodeId(buildId) + .project({"c0 as u_c0", "c1 as u_c1"}) + .planNode(), + "", + {"u_c0", "u_c1"}) + .singleAggregation({}, {"count(1)"}) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .planFragment(); // No splits. auto queryCtx = @@ -1525,7 +1546,7 @@ TEST_F(TaskManagerTest, checkBatchSplits) { ASSERT_EQ(resultOrFailure.status, nullptr); } -TEST_F(TaskManagerTest, buildSpillDirectoryFailure) { +TEST_P(TaskManagerTest, buildSpillDirectoryFailure) { // Cleanup old tasks between test iterations. taskManager_->setOldTaskCleanUpMs(0); for (bool buildSpillDirectoryFailure : {false}) { @@ -1582,6 +1603,9 @@ TEST_F(TaskManagerTest, buildSpillDirectoryFailure) { } } -// TODO: add disk spilling test for order by and hash join later. +VELOX_INSTANTIATE_TEST_SUITE_P( + TaskManagerTest, + TaskManagerTest, + testing::ValuesIn(TaskManagerTest::getTestParams())); } // namespace } // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp index 9d32452444625..e74b3111b9421 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp +++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp @@ -1828,7 +1828,10 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan( "Unsupported partitioning function: {}", toJsonString(systemPartitioningHandle->function)); planFragment.planNode = core::PartitionedOutputNode::single( - partitionedOutputNodeId, outputType, sourceNode); + partitionedOutputNodeId, + outputType, + velox::VectorSerde::Kind::kPresto, + sourceNode); return planFragment; case protocol::SystemPartitioning::FIXED: { switch (systemPartitioningHandle->function) { @@ -1837,7 +1840,10 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan( if (numPartitions == 1) { planFragment.planNode = core::PartitionedOutputNode::single( - partitionedOutputNodeId, outputType, sourceNode); + partitionedOutputNodeId, + outputType, + velox::VectorSerde::Kind::kPresto, + sourceNode); return planFragment; } planFragment.planNode = @@ -1849,6 +1855,7 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan( partitioningScheme.replicateNullsAndAny, std::make_shared(), outputType, + velox::VectorSerde::Kind::kPresto, sourceNode); return planFragment; } @@ -1857,7 +1864,10 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan( if (numPartitions == 1) { planFragment.planNode = core::PartitionedOutputNode::single( - partitionedOutputNodeId, outputType, sourceNode); + partitionedOutputNodeId, + outputType, + velox::VectorSerde::Kind::kPresto, + sourceNode); return planFragment; } planFragment.planNode = @@ -1870,12 +1880,17 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan( std::make_shared( inputType, keyChannels, constValues), outputType, + velox::VectorSerde::Kind::kPresto, sourceNode); return planFragment; } case protocol::SystemPartitionFunction::BROADCAST: { planFragment.planNode = core::PartitionedOutputNode::broadcast( - partitionedOutputNodeId, 1, outputType, sourceNode); + partitionedOutputNodeId, + 1, + outputType, + velox::VectorSerde::Kind::kPresto, + sourceNode); return planFragment; } default: @@ -1893,6 +1908,7 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan( planFragment.planNode = core::PartitionedOutputNode::arbitrary( partitionedOutputNodeId, std::move(outputType), + velox::VectorSerde::Kind::kPresto, std::move(sourceNode)); return planFragment; } @@ -1909,7 +1925,10 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan( if (numPartitions == 1) { planFragment.planNode = core::PartitionedOutputNode::single( - partitionedOutputNodeId, outputType, sourceNode); + partitionedOutputNodeId, + outputType, + velox::VectorSerde::Kind::kPresto, + sourceNode); return planFragment; } @@ -1924,6 +1943,7 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan( partitioningScheme.replicateNullsAndAny, std::shared_ptr(std::move(spec)), toRowType(partitioningScheme.outputLayout, typeParser_), + velox::VectorSerde::Kind::kPresto, sourceNode); return planFragment; } @@ -1935,6 +1955,7 @@ core::PlanNodePtr VeloxQueryPlanConverterBase::toVeloxQueryPlan( return core::PartitionedOutputNode::single( node->id, toRowType(node->outputVariables, typeParser_), + velox::VectorSerde::Kind::kPresto, VeloxQueryPlanConverterBase::toVeloxQueryPlan( node->source, tableWriteInfo, taskId)); } @@ -1955,9 +1976,14 @@ velox::core::PlanNodePtr VeloxInteractiveQueryPlanConverter::toVeloxQueryPlan( sortingOrders.emplace_back(toVeloxSortOrder(orderBy.sortOrder)); } return std::make_shared( - node->id, rowType, sortingKeys, sortingOrders); + node->id, + rowType, + sortingKeys, + sortingOrders, + velox::VectorSerde::Kind::kPresto); } - return std::make_shared(node->id, rowType); + return std::make_shared( + node->id, rowType, velox::VectorSerde::Kind::kPresto); } velox::connector::CommitStrategy @@ -1996,6 +2022,7 @@ velox::core::PlanFragment VeloxBatchQueryPlanConverter::toVeloxQueryPlan( partitionedOutputNode->id(), 1, broadcastWriteNode->outputType(), + velox::VectorSerde::Kind::kPresto, {broadcastWriteNode}); return planFragment; } @@ -2044,7 +2071,8 @@ velox::core::PlanNodePtr VeloxBatchQueryPlanConverter::toVeloxQueryPlan( auto rowType = toRowType(node->outputVariables, typeParser_); // Broadcast exchange source. if (node->exchangeType == protocol::ExchangeNodeType::REPLICATE) { - return std::make_shared(node->id, rowType); + return std::make_shared( + node->id, rowType, velox::VectorSerde::Kind::kPresto); } // Partitioned shuffle exchange source. return std::make_shared(node->id, rowType); diff --git a/presto-native-execution/velox b/presto-native-execution/velox index c5232cd317499..12b52e70ec85a 160000 --- a/presto-native-execution/velox +++ b/presto-native-execution/velox @@ -1 +1 @@ -Subproject commit c5232cd3174998a7834551783ae95776949c9da8 +Subproject commit 12b52e70ec85ae0cdb4aa990797cddda9be5be27