diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index f1e56ed7cfd6..5a09581edcb5 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -1853,6 +1853,7 @@ folly::dynamic TableWriteNode::serialize() const { obj["connectorInsertTableHandle"] = insertTableHandle_->connectorInsertTableHandle()->serialize(); obj["hasPartitioningScheme"] = hasPartitioningScheme_; + obj["hasBucketProperty"] = hasBucketProperty_; obj["outputType"] = outputType_->serialize(); obj["commitStrategy"] = connector::commitStrategyToString(commitStrategy_); return obj; @@ -1875,6 +1876,7 @@ PlanNodePtr TableWriteNode::create(const folly::dynamic& obj, void* context) { ISerializable::deserialize( obj["connectorInsertTableHandle"])); const bool hasPartitioningScheme = obj["hasPartitioningScheme"].asBool(); + const bool hasBucketProperty = obj["hasBucketProperty"].asBool(); auto outputType = deserializeRowType(obj["outputType"]); auto commitStrategy = connector::stringToCommitStrategy(obj["commitStrategy"].asString()); @@ -1887,6 +1889,7 @@ PlanNodePtr TableWriteNode::create(const folly::dynamic& obj, void* context) { std::make_shared( connectorId, connectorInsertTableHandle), hasPartitioningScheme, + hasBucketProperty, outputType, commitStrategy, source); diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index f3ff68e802fb..3600a86897ed 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -671,6 +671,7 @@ class TableWriteNode : public PlanNode { std::shared_ptr aggregationNode, std::shared_ptr insertTableHandle, bool hasPartitioningScheme, + bool hasBucketProperty, RowTypePtr outputType, connector::CommitStrategy commitStrategy, const PlanNodePtr& source) @@ -681,6 +682,7 @@ class TableWriteNode : public PlanNode { aggregationNode_(std::move(aggregationNode)), insertTableHandle_(std::move(insertTableHandle)), hasPartitioningScheme_(hasPartitioningScheme), + hasBucketProperty_(hasBucketProperty), outputType_(std::move(outputType)), commitStrategy_(commitStrategy) { VELOX_USER_CHECK_EQ(columns->size(), columnNames.size()); @@ -693,6 +695,30 @@ class TableWriteNode : public PlanNode { } } +#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY + TableWriteNode( + const PlanNodeId& id, + const RowTypePtr& columns, + const std::vector& columnNames, + std::shared_ptr aggregationNode, + std::shared_ptr insertTableHandle, + bool hasPartitioningScheme, + RowTypePtr outputType, + connector::CommitStrategy commitStrategy, + const PlanNodePtr& source) + : TableWriteNode( + id, + columns, + columnNames, + std::move(aggregationNode), + std::move(insertTableHandle), + hasPartitioningScheme, + false, + std::move(outputType), + commitStrategy, + source) {} +#endif + const std::vector& sources() const override { return sources_; } @@ -720,12 +746,20 @@ class TableWriteNode : public PlanNode { /// Indicates if this table write has specified partitioning scheme. If true, /// the task creates a number of table write operators based on the query /// config 'task_partitioned_writer_count', otherwise based on - /// 'task__writer_count'. As for now, this is only true for hive bucketed - /// table write. + /// 'task_writer_count'. bool hasPartitioningScheme() const { return hasPartitioningScheme_; } + /// Indicates if this table write has specified bucket property. If true, the + /// task creates a number of table write operators based on the query config + /// 'task_partitioned_bucket_writer_count', otherwise based on + /// 'task_partitioned_writer_count' or 'task__writer_count' depending on + /// whether paritition scheme is specified or not. + bool hasBucketProperty() const { + return hasBucketProperty_; + } + connector::CommitStrategy commitStrategy() const { return commitStrategy_; } @@ -756,6 +790,7 @@ class TableWriteNode : public PlanNode { const std::shared_ptr aggregationNode_; const std::shared_ptr insertTableHandle_; const bool hasPartitioningScheme_; + const bool hasBucketProperty_; const RowTypePtr outputType_; const connector::CommitStrategy commitStrategy_; }; diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 6bd934d0c4f5..ad42655b69c1 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -300,6 +300,11 @@ class QueryConfig { static constexpr const char* kTaskPartitionedWriterCount = "task_partitioned_writer_count"; + /// The number of local parallel table writer operators per task for + /// bucketed writes. If not set, use "task_writer_count". + static constexpr const char* kTaskBucketedWriterCount = + "task_bucketed_writer_count"; + /// If true, finish the hash probe on an empty build table for a specific set /// of hash joins. static constexpr const char* kHashProbeFinishEarlyOnEmptyBuild = @@ -767,6 +772,10 @@ class QueryConfig { .value_or(taskWriterCount()); } + uint32_t taskBucketedWriterCount() const { + return get(kTaskBucketedWriterCount).value_or(taskWriterCount()); + } + bool hashProbeFinishEarlyOnEmptyBuild() const { return get(kHashProbeFinishEarlyOnEmptyBuild, false); } diff --git a/velox/core/tests/QueryConfigTest.cpp b/velox/core/tests/QueryConfigTest.cpp index c89d44d2fdfa..227f85a2fbbe 100644 --- a/velox/core/tests/QueryConfigTest.cpp +++ b/velox/core/tests/QueryConfigTest.cpp @@ -57,27 +57,42 @@ TEST_F(QueryConfigTest, taskWriterCountConfig) { struct { std::optional numWriterCounter; std::optional numPartitionedWriterCounter; + std::optional numBucketedWriterCounter; int expectedWriterCounter; int expectedPartitionedWriterCounter; + int expectedBucketedWriterCounter; std::string debugString() const { return fmt::format( - "numWriterCounter[{}] numPartitionedWriterCounter[{}] expectedWriterCounter[{}] expectedPartitionedWriterCounter[{}]", + "numWriterCounter[{}] numPartitionedWriterCounter[{}] numBucketedWriterCounter[{}] expectedPartitionedWriterCounter[{}] expectedBucketedWriterCounter[{}]", numWriterCounter.value_or(0), numPartitionedWriterCounter.value_or(0), + numBucketedWriterCounter.value_or(0), expectedWriterCounter, - expectedPartitionedWriterCounter); + expectedPartitionedWriterCounter, + expectedBucketedWriterCounter); } } testSettings[] = { - {std::nullopt, std::nullopt, 4, 4}, - {std::nullopt, 1, 4, 1}, - {std::nullopt, 6, 4, 6}, - {2, 4, 2, 4}, - {4, 2, 4, 2}, - {4, 6, 4, 6}, - {6, 5, 6, 5}, - {6, 4, 6, 4}, - {6, std::nullopt, 6, 6}}; + {std::nullopt, std::nullopt, std::nullopt, 4, 4, 4}, + {std::nullopt, 1, std::nullopt, 4, 1, 4}, + {std::nullopt, 6, std::nullopt, 4, 6, 4}, + {2, 4, std::nullopt, 2, 4, 2}, + {4, 2, std::nullopt, 4, 2, 4}, + {4, 6, std::nullopt, 4, 6, 4}, + {6, 5, std::nullopt, 6, 5, 6}, + {6, 4, std::nullopt, 6, 4, 6}, + {6, std::nullopt, 6, 6, 6, 6}, + {6, std::nullopt, 1, 6, 6, 1}, + {std::nullopt, std::nullopt, 4, 4, 4, 4}, + {std::nullopt, std::nullopt, 1, 4, 4, 1}, + {std::nullopt, 1, 1, 4, 1, 1}, + {std::nullopt, 1, 2, 4, 1, 2}, + {std::nullopt, 6, 6, 4, 6, 6}, + {std::nullopt, 6, 3, 4, 6, 3}, + {2, 4, 3, 2, 4, 3}, + {4, 2, 1, 4, 2, 1}, + {4, 6, 7, 4, 6, 7}, + {6, std::nullopt, 4, 6, 6, 4}}; for (const auto& testConfig : testSettings) { SCOPED_TRACE(testConfig.debugString()); std::unordered_map configData; @@ -91,6 +106,11 @@ TEST_F(QueryConfigTest, taskWriterCountConfig) { QueryConfig::kTaskPartitionedWriterCount, std::to_string(testConfig.numPartitionedWriterCounter.value())); } + if (testConfig.numBucketedWriterCounter.has_value()) { + configData.emplace( + QueryConfig::kTaskBucketedWriterCount, + std::to_string(testConfig.numBucketedWriterCounter.value())); + } auto queryCtx = QueryCtx::create(nullptr, QueryConfig{std::move(configData)}); const QueryConfig& config = queryCtx->queryConfig(); @@ -98,6 +118,9 @@ TEST_F(QueryConfigTest, taskWriterCountConfig) { ASSERT_EQ( config.taskPartitionedWriterCount(), testConfig.expectedPartitionedWriterCounter); + ASSERT_EQ( + config.taskBucketedWriterCount(), + testConfig.expectedBucketedWriterCounter); } } diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index 87f12b445ce9..2abf0ba77e62 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -382,6 +382,10 @@ Table Writer - 1 - The number of parallel table writer threads per task. * - task_partitioned_writer_count + - integer + - task_writer_count + - The number of parallel table writer threads per task for partitioned table writes. If not set, use 'task_writer_count' as default. + * - task_bucketed_writer_count - integer - task_writer_count - The number of parallel table writer threads per task for bucketed table writes. If not set, use 'task_writer_count' as default. diff --git a/velox/exec/LocalPlanner.cpp b/velox/exec/LocalPlanner.cpp index bf99d78e4cf0..77d538e3aac1 100644 --- a/velox/exec/LocalPlanner.cpp +++ b/velox/exec/LocalPlanner.cpp @@ -238,7 +238,9 @@ uint32_t maxDrivers( if (!connectorInsertHandle->supportsMultiThreading()) { return 1; } else { - if (tableWrite->hasPartitioningScheme()) { + if (tableWrite->hasBucketProperty()) { + return queryConfig.taskBucketedWriterCount(); + } else if (tableWrite->hasPartitioningScheme()) { return queryConfig.taskPartitionedWriterCount(); } else { return queryConfig.taskWriterCount(); diff --git a/velox/exec/tests/TableWriteTest.cpp b/velox/exec/tests/TableWriteTest.cpp index eac3342dedd2..15b0247033cb 100644 --- a/velox/exec/tests/TableWriteTest.cpp +++ b/velox/exec/tests/TableWriteTest.cpp @@ -49,6 +49,8 @@ using namespace facebook::velox::dwio::common; using namespace facebook::velox::common::testutil; using namespace facebook::velox::common::hll; +constexpr uint64_t kQueryMemoryCapacity = 512 * MB; + enum class TestMode { kUnpartitioned, kPartitioned, @@ -100,6 +102,7 @@ std::function addTableWriter( const std::shared_ptr& aggregationNode, const std::shared_ptr& insertHandle, bool hasPartitioningScheme, + bool hasBucketProperty, connector::CommitStrategy commitStrategy = connector::CommitStrategy::kNoCommit) { return [=](core::PlanNodeId nodeId, @@ -111,6 +114,7 @@ std::function addTableWriter( aggregationNode, insertHandle, hasPartitioningScheme, + hasBucketProperty, TableWriteTraits::outputType(aggregationNode), commitStrategy, std::move(source)); @@ -604,6 +608,7 @@ class TableWriteTest : public HiveConnectorTestBase { partitionedBy, bucketProperty, compressionKind), + !partitionedBy.empty(), bucketProperty != nullptr, outputCommitStrategy)) .capturePlanNodeId(tableWriteNodeId_); @@ -628,6 +633,7 @@ class TableWriteTest : public HiveConnectorTestBase { partitionedBy, bucketProperty, compressionKind), + !partitionedBy.empty(), bucketProperty != nullptr, outputCommitStrategy)) .capturePlanNodeId(tableWriteNodeId_) @@ -669,6 +675,7 @@ class TableWriteTest : public HiveConnectorTestBase { partitionedBy, bucketProperty, compressionKind), + !partitionedBy.empty(), bucketProperty != nullptr, outputCommitStrategy)) .capturePlanNodeId(tableWriteNodeId_) @@ -1023,13 +1030,6 @@ class TableWriteTest : public HiveConnectorTestBase { const TestParam testParam_; const FileFormat fileFormat_; const TestMode testMode_; - // Returns all available table types to test insert without any - // partitions (used in "immutablePartitions" set of tests). - const std::vector tableTypes_ = { - // Velox does not currently support TEMPORARY table type. - // Once supported, it should be added to this list. - connector::hive::LocationHandle::TableType::kNew, - connector::hive::LocationHandle::TableType::kExisting}; const int numTableWriterCount_; const int numPartitionedTableWriterCount_; @@ -1605,6 +1605,65 @@ TEST_P(AllTableWriterTest, scanFilterProjectWrite) { } } +TEST_P(AllTableWriterTest, writerDriverThreads) { + const int batchSize = 1'000; + const int numBatches = 20; + const std::vector vectors = makeVectors(numBatches, batchSize); + + createDuckDbTable(vectors); + + auto queryCtx = core::QueryCtx::create(executor_.get()); + auto outputDirectory = TempDirectoryPath::create(); + core::PlanNodeId tableWriteNodeId; + auto writerPlan = + PlanBuilder() + .values(vectors, /*parallelizable=*/true) + .tableWrite( + outputDirectory->getPath(), + partitionedBy_, + bucketProperty_ != nullptr ? bucketProperty_->bucketCount() : 0, + bucketProperty_ != nullptr ? bucketProperty_->bucketedBy() + : std::vector{}, + bucketProperty_ != nullptr + ? bucketProperty_->sortedBy() + : std::vector>{}) + .capturePlanNodeId(tableWriteNodeId) + .localPartition({}) + .tableWriteMerge() + .project({TableWriteTraits::rowCountColumnName()}) + .singleAggregation( + {}, + {fmt::format("sum({})", TableWriteTraits::rowCountColumnName())}) + .planNode(); + const int taskWriterCount = 4; + const int taskPartitionedWriterCount = 8; + const int taskBucketWriterCount = 9; + const auto expectedNumWriterDrivers = bucketProperty_ != nullptr + ? taskBucketWriterCount + : partitionedBy_.empty() ? taskWriterCount + : taskPartitionedWriterCount; + const auto expectedNumRows = + numBatches * batchSize * expectedNumWriterDrivers; + auto task = AssertQueryBuilder(duckDbQueryRunner_) + .queryCtx(queryCtx) + .maxDrivers(10) + .config( + core::QueryConfig::kTaskWriterCount, + std::to_string(taskWriterCount)) + .config( + core::QueryConfig::kTaskPartitionedWriterCount, + std::to_string(taskPartitionedWriterCount)) + .config( + core::QueryConfig::kTaskBucketedWriterCount, + std::to_string(taskBucketWriterCount)) + .plan(std::move(writerPlan)) + .assertResults(fmt::format("SELECT {}", expectedNumRows)); + auto planStats = exec::toPlanStats(task->taskStats()); + auto& tableWriteStats = planStats.at(tableWriteNodeId); + ASSERT_EQ(tableWriteStats.numDrivers, expectedNumWriterDrivers); +} + TEST_P(AllTableWriterTest, renameAndReorderColumns) { auto filePaths = makeFilePaths(5); auto vectors = makeVectors(filePaths.size(), 500); @@ -2841,6 +2900,7 @@ TEST_P(AllTableWriterTest, columnStatsDataTypes) { partitionedBy_, nullptr, makeLocationHandle(outputDirectory->getPath()))), + !partitionedBy_.empty(), false, CommitStrategy::kNoCommit)) .planNode(); @@ -2930,7 +2990,8 @@ TEST_P(AllTableWriterTest, columnStats) { partitionedBy_, bucketProperty_, makeLocationHandle(outputDirectory->getPath()))), - false, + !partitionedBy_.empty(), + bucketProperty_ != nullptr, commitStrategy_)) .planNode(); @@ -2957,8 +3018,9 @@ TEST_P(AllTableWriterTest, columnStats) { // null partition2_update x null null // null partition3_update x null null // - // Note that we can have multiple same partition_update, they're for different - // files, but for stats, we would only have one record for each partition + // Note that we can have multiple same partition_update, they're for + // different files, but for stats, we would only have one record for each + // partition // // For unpartitioned, expected result is: // Row Fragment Context partition c1_min_value @@ -3029,7 +3091,8 @@ TEST_P(AllTableWriterTest, columnStatsWithTableWriteMerge) { partitionedBy_, bucketProperty_, makeLocationHandle(outputDirectory->getPath()))), - false, + !partitionedBy_.empty(), + bucketProperty_ != nullptr, commitStrategy_)); auto mergeAggregationNode = generateAggregationNode( @@ -3066,8 +3129,9 @@ TEST_P(AllTableWriterTest, columnStatsWithTableWriteMerge) { // null partition2_update x null null // null partition3_update x null null // - // Note that we can have multiple same partition_update, they're for different - // files, but for stats, we would only have one record for each partition + // Note that we can have multiple same partition_update, they're for + // different files, but for stats, we would only have one record for each + // partition // // For unpartitioned, expected result is: // Row Fragment Context partition c1_min_value @@ -3478,13 +3542,17 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, reclaimFromTableWriter) { for (bool writerSpillEnabled : {false, true}) { { SCOPED_TRACE(fmt::format("writerSpillEnabled: {}", writerSpillEnabled)); - auto memoryManager = createMemoryManager(); - auto arbitrator = memoryManager->arbitrator(); - auto queryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), kMemoryCapacity); - ASSERT_EQ(queryCtx->pool()->capacity(), kMemoryPoolInitCapacity); - - std::atomic numInputs{0}; + auto queryPool = memory::memoryManager()->addRootPool( + "reclaimFromTableWriter", kQueryMemoryCapacity); + auto* arbitrator = memory::memoryManager()->arbitrator(); + const int numPrevArbitrationFailures = arbitrator->stats().numFailures; + const int numPrevNonReclaimableAttempts = + arbitrator->stats().numNonReclaimableAttempts; + auto queryCtx = core::QueryCtx::create( + executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool)); + ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity); + + std::atomic_int numInputs{0}; SCOPED_TESTVALUE_SET( "facebook::velox::exec::Driver::runInternal::addInput", std::function(([&](Operator* op) { @@ -3499,8 +3567,7 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, reclaimFromTableWriter) { } const auto fakeAllocationSize = - arbitrator->stats().maxCapacityBytes - - op->pool()->parent()->reservedBytes(); + kQueryMemoryCapacity - op->pool()->parent()->reservedBytes(); if (writerSpillEnabled) { auto* buffer = op->pool()->allocate(fakeAllocationSize); op->pool()->free(buffer, fakeAllocationSize); @@ -3534,8 +3601,8 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, reclaimFromTableWriter) { .config(core::QueryConfig::kSpillEnabled, writerSpillEnabled) .config( core::QueryConfig::kWriterSpillEnabled, writerSpillEnabled) - // Set 0 file writer flush threshold to always trigger flush in - // test. + // Set 0 file writer flush threshold to always trigger flush + // in test. .config(core::QueryConfig::kWriterFlushThresholdBytes, 0) .plan(std::move(writerPlan)) .assertResults(fmt::format("SELECT {}", numRows)); @@ -3553,15 +3620,19 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, reclaimFromTableWriter) { .at(HiveDataSink::kEarlyFlushedRawBytes) .sum, 0); - ASSERT_EQ(arbitrator->stats().numFailures, 0); + ASSERT_EQ( + arbitrator->stats().numFailures, numPrevArbitrationFailures); } else { ASSERT_EQ( tableWriteStats->customStats.count( HiveDataSink::kEarlyFlushedRawBytes), 0); - ASSERT_EQ(arbitrator->stats().numFailures, 1); + ASSERT_EQ( + arbitrator->stats().numFailures, numPrevArbitrationFailures + 1); } - ASSERT_EQ(arbitrator->stats().numNonReclaimableAttempts, 0); + ASSERT_EQ( + arbitrator->stats().numNonReclaimableAttempts, + numPrevNonReclaimableAttempts); } waitForAllTasksToBeDeleted(3'000'000); } @@ -3590,11 +3661,15 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, reclaimFromSortTableWriter) { for (bool writerSpillEnabled : {false, true}) { { SCOPED_TRACE(fmt::format("writerSpillEnabled: {}", writerSpillEnabled)); - auto memoryManager = createMemoryManager(); - auto arbitrator = memoryManager->arbitrator(); - auto queryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), kMemoryCapacity); - ASSERT_EQ(queryCtx->pool()->capacity(), kMemoryPoolInitCapacity); + auto queryPool = memory::memoryManager()->addRootPool( + "reclaimFromSortTableWriter", kQueryMemoryCapacity); + auto* arbitrator = memory::memoryManager()->arbitrator(); + const int numPrevArbitrationFailures = arbitrator->stats().numFailures; + const int numPrevNonReclaimableAttempts = + arbitrator->stats().numNonReclaimableAttempts; + auto queryCtx = core::QueryCtx::create( + executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool)); + ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity); const auto spillStats = common::globalSpillStats(); std::atomic numInputs{0}; @@ -3612,8 +3687,7 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, reclaimFromSortTableWriter) { } const auto fakeAllocationSize = - arbitrator->stats().maxCapacityBytes - - op->pool()->parent()->reservedBytes(); + kQueryMemoryCapacity - op->pool()->parent()->reservedBytes(); if (writerSpillEnabled) { auto* buffer = op->pool()->allocate(fakeAllocationSize); op->pool()->free(buffer, fakeAllocationSize); @@ -3651,13 +3725,18 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, reclaimFromSortTableWriter) { .spillDirectory(spillDirectory->getPath()) .config(core::QueryConfig::kSpillEnabled, writerSpillEnabled) .config(core::QueryConfig::kWriterSpillEnabled, writerSpillEnabled) - // Set 0 file writer flush threshold to always trigger flush in test. + // Set 0 file writer flush threshold to always trigger flush in + // test. .config(core::QueryConfig::kWriterFlushThresholdBytes, 0) .plan(std::move(writerPlan)) .assertResults(fmt::format("SELECT {}", numRows)); - ASSERT_EQ(arbitrator->stats().numFailures, writerSpillEnabled ? 0 : 1); - ASSERT_EQ(arbitrator->stats().numNonReclaimableAttempts, 0); + ASSERT_EQ( + arbitrator->stats().numFailures, + numPrevArbitrationFailures + (writerSpillEnabled ? 0 : 1)); + ASSERT_EQ( + arbitrator->stats().numNonReclaimableAttempts, + numPrevNonReclaimableAttempts); waitForAllTasksToBeDeleted(3'000'000); const auto updatedSpillStats = common::globalSpillStats(); @@ -3696,11 +3775,16 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, writerFlushThreshold) { succinctBytes(testParam.bytesToReserve), succinctBytes(testParam.writerFlushThreshold))); - auto memoryManager = createMemoryManager(); - auto arbitrator = memoryManager->arbitrator(); - auto queryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), kMemoryCapacity); - ASSERT_EQ(queryCtx->pool()->capacity(), kMemoryPoolInitCapacity); + auto queryPool = memory::memoryManager()->addRootPool( + "writerFlushThreshold", kQueryMemoryCapacity); + auto* arbitrator = memory::memoryManager()->arbitrator(); + const int numPrevArbitrationFailures = arbitrator->stats().numFailures; + const int numPrevNonReclaimableAttempts = + arbitrator->stats().numNonReclaimableAttempts; + const int numPrevShrinks = arbitrator->stats().numShrinks; + auto queryCtx = core::QueryCtx::create( + executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool)); + ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity); memory::MemoryPool* compressionPool{nullptr}; SCOPED_TESTVALUE_SET( @@ -3729,8 +3813,8 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, writerFlushThreshold) { compressionPool->maybeReserve(testParam.bytesToReserve); } - const auto fakeAllocationSize = arbitrator->stats().maxCapacityBytes - - op->pool()->parent()->usedBytes(); + const auto fakeAllocationSize = + kQueryMemoryCapacity - op->pool()->parent()->usedBytes(); if (testParam.writerFlushThreshold == 0) { auto* buffer = op->pool()->allocate(fakeAllocationSize); op->pool()->free(buffer, fakeAllocationSize); @@ -3768,14 +3852,17 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, writerFlushThreshold) { ASSERT_EQ( arbitrator->stats().numFailures, - testParam.writerFlushThreshold == 0 ? 0 : 1); - // We don't trigger reclaim on a writer if it doesn't meet the writer flush - // threshold. - ASSERT_EQ(arbitrator->stats().numNonReclaimableAttempts, 0); + numPrevArbitrationFailures + + (testParam.writerFlushThreshold == 0 ? 0 : 1)); + // We don't trigger reclaim on a writer if it doesn't meet the writer + // flush threshold. + ASSERT_EQ( + arbitrator->stats().numNonReclaimableAttempts, + numPrevNonReclaimableAttempts); ASSERT_GE(arbitrator->stats().numReclaimedBytes, testParam.bytesToReserve); waitForAllTasksToBeDeleted(3'000'000); queryCtx.reset(); - ASSERT_EQ(arbitrator->stats().numShrinks, 1); + ASSERT_EQ(arbitrator->stats().numShrinks, numPrevShrinks + 1); } } @@ -3798,11 +3885,15 @@ DEBUG_ONLY_TEST_F( createDuckDbTable(vectors); - auto memoryManager = createMemoryManager(); - auto arbitrator = memoryManager->arbitrator(); - auto queryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), kMemoryCapacity); - ASSERT_EQ(queryCtx->pool()->capacity(), kMemoryPoolInitCapacity); + auto queryPool = memory::memoryManager()->addRootPool( + "reclaimFromNonReclaimableTableWriter", kQueryMemoryCapacity); + auto* arbitrator = memory::memoryManager()->arbitrator(); + const int numPrevArbitrationFailures = arbitrator->stats().numFailures; + const int numPrevNonReclaimableAttempts = + arbitrator->stats().numNonReclaimableAttempts; + auto queryCtx = core::QueryCtx::create( + executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool)); + ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity); std::atomic injectFakeAllocationOnce{true}; SCOPED_TESTVALUE_SET( @@ -3814,7 +3905,7 @@ DEBUG_ONLY_TEST_F( auto& pool = writer->getContext().getMemoryPool( dwrf::MemoryUsageCategory::GENERAL); const auto fakeAllocationSize = - arbitrator->stats().maxCapacityBytes - pool.reservedBytes(); + kQueryMemoryCapacity - pool.reservedBytes(); VELOX_ASSERT_THROW( pool.allocate(fakeAllocationSize), "Exceeded memory pool"); }))); @@ -3853,8 +3944,10 @@ DEBUG_ONLY_TEST_F( .plan(std::move(writerPlan)) .assertResults(fmt::format("SELECT {}", numRows)); - ASSERT_EQ(arbitrator->stats().numFailures, 1); - ASSERT_EQ(arbitrator->stats().numNonReclaimableAttempts, 1); + ASSERT_EQ(arbitrator->stats().numFailures, numPrevArbitrationFailures + 1); + ASSERT_EQ( + arbitrator->stats().numNonReclaimableAttempts, + numPrevNonReclaimableAttempts + 1); waitForAllTasksToBeDeleted(); } @@ -3876,11 +3969,16 @@ DEBUG_ONLY_TEST_F( } createDuckDbTable(vectors); - auto memoryManager = createMemoryManager(); - auto arbitrator = memoryManager->arbitrator(); - auto queryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), kMemoryCapacity); - ASSERT_EQ(queryCtx->pool()->capacity(), kMemoryPoolInitCapacity); + auto queryPool = memory::memoryManager()->addRootPool( + "arbitrationFromTableWriterWithNoMoreInput", kQueryMemoryCapacity); + auto* arbitrator = memory::memoryManager()->arbitrator(); + const int numPrevArbitrationFailures = arbitrator->stats().numFailures; + const int numPrevNonReclaimableAttempts = + arbitrator->stats().numNonReclaimableAttempts; + const int numPrevReclaimedBytes = arbitrator->stats().numReclaimedBytes; + auto queryCtx = core::QueryCtx::create( + executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool)); + ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity); std::atomic writerNoMoreInput{false}; SCOPED_TESTVALUE_SET( @@ -3905,8 +4003,8 @@ DEBUG_ONLY_TEST_F( if (!injectGetOutputOnce.exchange(false)) { return; } - const auto fakeAllocationSize = arbitrator->stats().maxCapacityBytes - - op->pool()->parent()->reservedBytes(); + const auto fakeAllocationSize = + kQueryMemoryCapacity - op->pool()->parent()->reservedBytes(); auto* buffer = op->pool()->allocate(fakeAllocationSize); op->pool()->free(buffer, fakeAllocationSize); }))); @@ -3944,9 +4042,11 @@ DEBUG_ONLY_TEST_F( .plan(std::move(writerPlan)) .assertResults(fmt::format("SELECT {}", numRows)); - ASSERT_EQ(arbitrator->stats().numNonReclaimableAttempts, 0); - ASSERT_EQ(arbitrator->stats().numFailures, 0); - ASSERT_GT(arbitrator->stats().numReclaimedBytes, 0); + ASSERT_EQ( + arbitrator->stats().numNonReclaimableAttempts, + numPrevArbitrationFailures); + ASSERT_EQ(arbitrator->stats().numFailures, numPrevNonReclaimableAttempts); + ASSERT_GT(arbitrator->stats().numReclaimedBytes, numPrevReclaimedBytes); waitForAllTasksToBeDeleted(); } @@ -3972,11 +4072,15 @@ DEBUG_ONLY_TEST_F( createDuckDbTable(vectors); - auto memoryManager = createMemoryManager(); - auto arbitrator = memoryManager->arbitrator(); - auto queryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), kMemoryCapacity); - ASSERT_EQ(queryCtx->pool()->capacity(), kMemoryPoolInitCapacity); + auto queryPool = memory::memoryManager()->addRootPool( + "reclaimFromNonReclaimableSortTableWriter", kQueryMemoryCapacity); + auto* arbitrator = memory::memoryManager()->arbitrator(); + const int numPrevArbitrationFailures = arbitrator->stats().numFailures; + const int numPrevNonReclaimableAttempts = + arbitrator->stats().numNonReclaimableAttempts; + auto queryCtx = core::QueryCtx::create( + executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool)); + ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity); std::atomic injectFakeAllocationOnce{true}; SCOPED_TESTVALUE_SET( @@ -3993,8 +4097,8 @@ DEBUG_ONLY_TEST_F( if (!injectFakeAllocationOnce.exchange(false)) { return; } - const auto fakeAllocationSize = arbitrator->stats().maxCapacityBytes - - pool->parent()->reservedBytes(); + const auto fakeAllocationSize = + kQueryMemoryCapacity - pool->parent()->reservedBytes(); VELOX_ASSERT_THROW( pool->allocate(fakeAllocationSize), "Exceeded memory pool"); }))); @@ -4042,8 +4146,10 @@ DEBUG_ONLY_TEST_F( .plan(std::move(writerPlan)) .assertResults(fmt::format("SELECT {}", numRows)); - ASSERT_EQ(arbitrator->stats().numFailures, 1); - ASSERT_EQ(arbitrator->stats().numNonReclaimableAttempts, 1); + ASSERT_EQ(arbitrator->stats().numFailures, numPrevArbitrationFailures + 1); + ASSERT_EQ( + arbitrator->stats().numNonReclaimableAttempts, + numPrevNonReclaimableAttempts + 1); const auto updatedSpillStats = common::globalSpillStats(); ASSERT_EQ(updatedSpillStats, spillStats); waitForAllTasksToBeDeleted(); @@ -4065,13 +4171,14 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, tableFileWriteError) { createDuckDbTable(vectors); - auto memoryManager = - createMemoryManager(memoryCapacity, kMemoryPoolInitCapacity); - auto arbitrator = memoryManager->arbitrator(); - auto queryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), memoryCapacity); - ASSERT_EQ(queryCtx->pool()->capacity(), kMemoryPoolInitCapacity); - std::atomic injectWriterErrorOnce{true}; + auto queryPool = memory::memoryManager()->addRootPool( + "tableFileWriteError", kQueryMemoryCapacity); + auto* arbitrator = memory::memoryManager()->arbitrator(); + auto queryCtx = core::QueryCtx::create( + executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool)); + ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity); + + std::atomic_bool injectWriterErrorOnce{true}; SCOPED_TESTVALUE_SET( "facebook::velox::dwrf::Writer::write", std::function(([&](dwrf::Writer* writer) { @@ -4104,8 +4211,8 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, tableFileWriteError) { // Set 0 file writer flush threshold to always reclaim memory from // file writer. .config(core::QueryConfig::kWriterFlushThresholdBytes, 0) - // Set stripe size to extreme large to avoid writer internal triggered - // flush. + // Set stripe size to extreme large to avoid writer internal + // triggered flush. .connectorSessionProperty( kHiveConnectorId, connector::hive::HiveConfig::kOrcWriterMaxStripeSizeSession, @@ -4134,20 +4241,19 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, tableWriteSpillUseMoreMemory) { vectors.push_back(fuzzer.fuzzInputRow(rowType_)); } - auto memoryManager = - createMemoryManager(memoryCapacity, kMemoryPoolInitCapacity); - auto arbitrator = memoryManager->arbitrator(); + auto queryPool = memory::memoryManager()->addRootPool( + "tableWriteSpillUseMoreMemory", kQueryMemoryCapacity / 4); + auto queryCtx = core::QueryCtx::create( + executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool)); + ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity / 4); - std::shared_ptr queryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), memoryCapacity / 8); - std::shared_ptr fakeQueryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), memoryCapacity); - auto fakePool = fakeQueryCtx->pool()->addLeafChild( - "fakePool", true, FakeMemoryReclaimer::create()); + auto fakeLeafPool = queryCtx->pool()->addLeafChild( + "fakeLeaf", true, FakeMemoryReclaimer::create()); + const int fakeAllocationSize = kQueryMemoryCapacity * 3 / 16; TestAllocation injectedFakeAllocation{ - fakePool.get(), - fakePool->allocate(memoryCapacity * 3 / 4), - memoryCapacity * 3 / 4}; + fakeLeafPool.get(), + fakeLeafPool->allocate(fakeAllocationSize), + fakeAllocationSize}; void* allocatedBuffer; TestAllocation injectedWriterAllocation; @@ -4159,21 +4265,21 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, tableWriteSpillUseMoreMemory) { auto& pool = writer->getContext().getMemoryPool( dwrf::MemoryUsageCategory::GENERAL); injectedWriterAllocation.pool = &pool; - injectedWriterAllocation.size = memoryCapacity / 8; + injectedWriterAllocation.size = kQueryMemoryCapacity / 8; injectedWriterAllocation.buffer = pool.allocate(injectedWriterAllocation.size); }))); - // Free the extra fake memory allocations to make memory pool state consistent - // at the end of test. - std::atomic clearAllocationOnce{true}; + // Free the extra fake memory allocations to make memory pool state + // consistent at the end of test. + std::atomic_bool clearAllocationOnce{true}; SCOPED_TESTVALUE_SET( "facebook::velox::exec::Task::setError", std::function(([&](Task* task) { if (!clearAllocationOnce.exchange(false)) { return; } - ASSERT_EQ(injectedWriterAllocation.size, memoryCapacity / 8); + ASSERT_EQ(injectedWriterAllocation.size, kQueryMemoryCapacity / 8); injectedWriterAllocation.free(); }))); @@ -4190,10 +4296,11 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, tableWriteSpillUseMoreMemory) { .spillDirectory(spillDirectory->getPath()) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kWriterSpillEnabled, true) - // Set 0 file writer flush threshold to always trigger flush in test. + // Set 0 file writer flush threshold to always trigger flush in + // test. .config(core::QueryConfig::kWriterFlushThresholdBytes, 0) - // Set stripe size to extreme large to avoid writer internal triggered - // flush. + // Set stripe size to extreme large to avoid writer internal + // triggered flush. .connectorSessionProperty( kHiveConnectorId, connector::hive::HiveConfig::kOrcWriterMaxStripeSizeSession, @@ -4222,16 +4329,23 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, tableWriteReclaimOnClose) { numRows += vectors.back()->size(); } - auto memoryManager = createMemoryManager(); - auto arbitrator = memoryManager->arbitrator(); - auto queryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), kMemoryCapacity); - auto fakeQueryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), kMemoryCapacity); - auto fakePool = fakeQueryCtx->pool()->addLeafChild( - "fakePool", true, FakeMemoryReclaimer::create()); + auto* arbitrator = memory::memoryManager()->arbitrator(); + auto queryPool = memory::memoryManager()->addRootPool( + "tableWriteSpillUseMoreMemory", kQueryMemoryCapacity); + auto queryCtx = core::QueryCtx::create( + executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool)); + ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity); - std::atomic writerNoMoreInput{false}; + auto fakeQueryPool = + memory::memoryManager()->addRootPool("fake", kQueryMemoryCapacity); + auto fakeQueryCtx = core::QueryCtx::create( + executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(fakeQueryPool)); + ASSERT_EQ(fakeQueryCtx->pool()->capacity(), kQueryMemoryCapacity); + + auto fakeLeafPool = fakeQueryCtx->pool()->addLeafChild( + "fakeLeaf", true, FakeMemoryReclaimer::create()); + + std::atomic_bool writerNoMoreInput{false}; SCOPED_TESTVALUE_SET( "facebook::velox::exec::Driver::runInternal::noMoreInput", std::function(([&](Operator* op) { @@ -4252,14 +4366,13 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, tableWriteReclaimOnClose) { if (!maybeReserveInjectOnce.exchange(false)) { return; } - // The injection memory allocation to cause maybeReserve on writer close - // to trigger memory arbitration. The latter tries to reclaim memory - // from this file writer. - const size_t injectAllocationSize = - pool->freeBytes() + arbitrator->stats().freeCapacityBytes; + // The injection memory allocation to cause maybeReserve on writer + // close to trigger memory arbitration. The latter tries to reclaim + // memory from this file writer. + const size_t injectAllocationSize = kQueryMemoryCapacity; fakeAllocation = TestAllocation{ - .pool = fakePool.get(), - .buffer = fakePool->allocate(injectAllocationSize), + .pool = fakeLeafPool.get(), + .buffer = fakeLeafPool->allocate(injectAllocationSize), .size = injectAllocationSize}; })); @@ -4312,8 +4425,11 @@ DEBUG_ONLY_TEST_F( const auto expectedResult = runWriteTask(vectors, nullptr, false, 1, pool(), kHiveConnectorId, false) .data; - auto queryCtx = - newQueryCtx(memory::memoryManager(), executor_.get(), memoryCapacity); + auto queryPool = memory::memoryManager()->addRootPool( + "tableWriteSpillUseMoreMemory", kQueryMemoryCapacity); + auto queryCtx = core::QueryCtx::create( + executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool)); + ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity); std::atomic_bool writerCloseWaitFlag{true}; folly::EventCount writerCloseWait; diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index 27baada36c32..ae0a40b26019 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -434,7 +434,7 @@ PlanBuilder& PlanBuilder::tableWrite( connector::hive::LocationHandle::TableType::kNew, outputFileName); std::shared_ptr bucketProperty; - if (!partitionBy.empty() && bucketCount != 0) { + if (bucketCount != 0) { bucketProperty = buildHiveBucketProperty(rowType, bucketCount, bucketedBy, sortBy); } @@ -471,7 +471,8 @@ PlanBuilder& PlanBuilder::tableWrite( rowType->names(), aggregationNode, insertHandle, - false, + !partitionBy.empty(), + bucketProperty != nullptr, TableWriteTraits::outputType(aggregationNode), connector::CommitStrategy::kNoCommit, planNode_);