From 59f6c6617faea0bbef5dc82217f1760344bd0797 Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Mon, 25 Sep 2023 16:15:12 +0200 Subject: [PATCH 1/5] Rename getMaxPartitionWritersBasedOnMemory --- .../io/trino/operator/output/SkewedPartitionRebalancer.java | 4 ++-- .../java/io/trino/sql/planner/LocalExecutionPlanner.java | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java b/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java index e88bfb5dc23a..458b4ccd17d3 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java @@ -153,7 +153,7 @@ public static PartitionFunction createPartitionFunction( IntStream.range(0, bucketCount).toArray()); } - public static int getMaxPartitionWritersBasedOnMemory(Session session) + public static int getMaxWritersBasedOnMemory(Session session) { return (int) ceil((double) getQueryMaxMemoryPerNode(session).toBytes() / getMaxMemoryPerPartitionWriter(session).toBytes()); } @@ -161,7 +161,7 @@ public static int getMaxPartitionWritersBasedOnMemory(Session session) public static int getScaleWritersMaxSkewedPartitions(Session session) { // Set the value of maxSkewedPartitions to scale to 60% of maximum number of writers possible per node. - return (int) (getMaxPartitionWritersBasedOnMemory(session) * 0.60); + return (int) (getMaxWritersBasedOnMemory(session) * 0.60); } public static int getTaskCount(PartitioningScheme partitioningScheme) diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java index 8b9b5541e5e4..b2e2051a12f8 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java @@ -335,7 +335,7 @@ import static io.trino.operator.join.NestedLoopJoinOperator.NestedLoopJoinOperatorFactory; import static io.trino.operator.output.SkewedPartitionRebalancer.checkCanScalePartitionsRemotely; import static io.trino.operator.output.SkewedPartitionRebalancer.createPartitionFunction; -import static io.trino.operator.output.SkewedPartitionRebalancer.getMaxPartitionWritersBasedOnMemory; +import static io.trino.operator.output.SkewedPartitionRebalancer.getMaxWritersBasedOnMemory; import static io.trino.operator.output.SkewedPartitionRebalancer.getScaleWritersMaxSkewedPartitions; import static io.trino.operator.output.SkewedPartitionRebalancer.getTaskCount; import static io.trino.operator.window.pattern.PhysicalValuePointer.CLASSIFIER; @@ -588,7 +588,7 @@ public LocalExecutionPlan plan( // Consider memory while calculating the number of writers. This is to avoid creating too many task buckets. int partitionedWriterCount = min( getTaskPartitionedWriterCount(taskContext.getSession()), - previousPowerOfTwo(getMaxPartitionWritersBasedOnMemory(taskContext.getSession()))); + previousPowerOfTwo(getMaxWritersBasedOnMemory(taskContext.getSession()))); // Keep the task bucket count to 50% of total local writers int taskBucketCount = (int) ceil(0.5 * partitionedWriterCount); skewedPartitionRebalancer = Optional.of(new SkewedPartitionRebalancer( @@ -3505,7 +3505,7 @@ private int getWriterCount(Session session, WriterScalingOptions connectorScalin return 1; } - int maxWritersBasedOnMemory = getMaxPartitionWritersBasedOnMemory(session); + int maxWritersBasedOnMemory = getMaxWritersBasedOnMemory(session); if (partitioningScheme.isPresent()) { // The default value of partitioned writer count is 32 which is high enough to use it // for both cases when scaling is enabled or not. Additionally, it doesn't lead to too many From e03aa64d96881d70d00f2629721f960044565a48 Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Mon, 25 Sep 2023 16:20:06 +0200 Subject: [PATCH 2/5] Fix using incorrect writer count for partitioned insert --- .../main/java/io/trino/sql/planner/LocalExecutionPlanner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java index b2e2051a12f8..14ca8584dac8 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java @@ -3510,7 +3510,7 @@ private int getWriterCount(Session session, WriterScalingOptions connectorScalin // The default value of partitioned writer count is 32 which is high enough to use it // for both cases when scaling is enabled or not. Additionally, it doesn't lead to too many // small files since when scaling is disabled only single writer will handle a single partition. - int partitionedWriterCount = getTaskWriterCount(session); + int partitionedWriterCount = getTaskPartitionedWriterCount(session); if (isLocalScaledWriterExchange(source)) { partitionedWriterCount = connectorScalingOptions.perTaskMaxScaledWriterCount() .map(writerCount -> min(writerCount, getTaskPartitionedWriterCount(session))) From 77bc978878e8da30882afcb8b14214989dc7fae2 Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Mon, 25 Sep 2023 16:26:32 +0200 Subject: [PATCH 3/5] Update comment --- .../java/io/trino/sql/planner/LocalExecutionPlanner.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java index 14ca8584dac8..d24c6a7faee7 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java @@ -3507,9 +3507,10 @@ private int getWriterCount(Session session, WriterScalingOptions connectorScalin int maxWritersBasedOnMemory = getMaxWritersBasedOnMemory(session); if (partitioningScheme.isPresent()) { - // The default value of partitioned writer count is 32 which is high enough to use it - // for both cases when scaling is enabled or not. Additionally, it doesn't lead to too many - // small files since when scaling is disabled only single writer will handle a single partition. + // The default value of partitioned writer count is 2 * number_of_cores (capped to 64) which is high + // enough to use it for cases with or without scaling enabled. Additionally, it doesn't lead + // to too many small files when scaling is disabled because single partition will be written by + // a single writer only. int partitionedWriterCount = getTaskPartitionedWriterCount(session); if (isLocalScaledWriterExchange(source)) { partitionedWriterCount = connectorScalingOptions.perTaskMaxScaledWriterCount() From bca65aeda8e6bc331ebdf12d3b9162d13a84dd2c Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Mon, 25 Sep 2023 16:32:48 +0200 Subject: [PATCH 4/5] Extract getPartitionedWriterCountBasedOnMemory --- .../sql/planner/LocalExecutionPlanner.java | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java index d24c6a7faee7..33007a0ee16f 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java @@ -585,10 +585,7 @@ public LocalExecutionPlan plan( int taskCount = getTaskCount(partitioningScheme); if (checkCanScalePartitionsRemotely(taskContext.getSession(), taskCount, partitioningScheme.getPartitioning().getHandle(), nodePartitioningManager)) { partitionFunction = createPartitionFunction(taskContext.getSession(), nodePartitioningManager, partitioningScheme, partitionChannelTypes); - // Consider memory while calculating the number of writers. This is to avoid creating too many task buckets. - int partitionedWriterCount = min( - getTaskPartitionedWriterCount(taskContext.getSession()), - previousPowerOfTwo(getMaxWritersBasedOnMemory(taskContext.getSession()))); + int partitionedWriterCount = getPartitionedWriterCountBasedOnMemory(taskContext.getSession()); // Keep the task bucket count to 50% of total local writers int taskBucketCount = (int) ceil(0.5 * partitionedWriterCount); skewedPartitionRebalancer = Optional.of(new SkewedPartitionRebalancer( @@ -3505,7 +3502,6 @@ private int getWriterCount(Session session, WriterScalingOptions connectorScalin return 1; } - int maxWritersBasedOnMemory = getMaxWritersBasedOnMemory(session); if (partitioningScheme.isPresent()) { // The default value of partitioned writer count is 2 * number_of_cores (capped to 64) which is high // enough to use it for cases with or without scaling enabled. Additionally, it doesn't lead @@ -3517,8 +3513,7 @@ private int getWriterCount(Session session, WriterScalingOptions connectorScalin .map(writerCount -> min(writerCount, getTaskPartitionedWriterCount(session))) .orElse(getTaskPartitionedWriterCount(session)); } - // Consider memory while calculating writer count. - return min(partitionedWriterCount, previousPowerOfTwo(maxWritersBasedOnMemory)); + return getPartitionedWriterCountBasedOnMemory(partitionedWriterCount, session); } int unpartitionedWriterCount = getTaskWriterCount(session); @@ -3528,7 +3523,7 @@ private int getWriterCount(Session session, WriterScalingOptions connectorScalin .orElse(getTaskScaleWritersMaxWriterCount(session)); } // Consider memory while calculating writer count. - return min(unpartitionedWriterCount, maxWritersBasedOnMemory); + return min(unpartitionedWriterCount, getMaxWritersBasedOnMemory(session)); } private boolean isSingleGatheringExchange(PlanNode node) @@ -4112,6 +4107,16 @@ private OperatorFactory createHashAggregationOperatorFactory( } } + private int getPartitionedWriterCountBasedOnMemory(Session session) + { + return getPartitionedWriterCountBasedOnMemory(getTaskPartitionedWriterCount(session), session); + } + + private int getPartitionedWriterCountBasedOnMemory(int partitionedWriterCount, Session session) + { + return min(partitionedWriterCount, previousPowerOfTwo(getMaxWritersBasedOnMemory(session))); + } + private static Optional createPartialAggregationController(Optional maxPartialAggregationMemorySize, AggregationNode.Step step, Session session) { return maxPartialAggregationMemorySize.isPresent() && step.isOutputPartial() && isAdaptivePartialAggregationEnabled(session) ? From a5a5b8564b535e1d946443a3626098218f35bfa5 Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Mon, 25 Sep 2023 17:12:52 +0200 Subject: [PATCH 5/5] Simplify scaled and partitioned writer count (plus update doc) --- .../io/trino/execution/TaskManagerConfig.java | 4 ++-- .../trino/execution/TestTaskManagerConfig.java | 16 +++++++++------- docs/src/main/sphinx/admin/properties-task.md | 2 +- .../sphinx/admin/properties-writer-scaling.md | 2 +- 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java b/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java index c5856518a0ee..89ac8fbc2a21 100644 --- a/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java @@ -82,14 +82,14 @@ public class TaskManagerConfig // Set the value of default max writer count to the number of processors * 2 and cap it to 64. We can set this value // higher because preferred write partitioning is always enabled for local exchange thus partitioned inserts will never // use this property. Additionally, we have a mechanism to stop scaling if local memory utilization is high. - private int scaleWritersMaxWriterCount = min(getAvailablePhysicalProcessorCount(), 32) * 2; + private int scaleWritersMaxWriterCount = min(getAvailablePhysicalProcessorCount() * 2, 64); private int writerCount = 1; // Default value of partitioned task writer count should be above 1, otherwise it can create a plan // with a single gather exchange node on the coordinator due to a single available processor. Whereas, // on the worker nodes due to more available processors, the default value could be above 1. Therefore, // it can cause error due to config mismatch during execution. Additionally, cap it to 64 in order to // avoid small pages produced by local partitioning exchanges. - private int partitionedWriterCount = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32) * 2; + private int partitionedWriterCount = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount() * 2), 2), 64); // Default value of task concurrency should be above 1, otherwise it can create a plan with a single gather // exchange node on the coordinator due to a single available processor. Whereas, on the worker nodes due to // more available processors, the default value could be above 1. Therefore, it can cause error due to config diff --git a/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java b/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java index 83e292a16144..0c8b9d2fbb70 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java @@ -34,7 +34,8 @@ public class TestTaskManagerConfig { private static final int DEFAULT_PROCESSOR_COUNT = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32); - private static final int DEFAULT_SCALE_WRITERS_MAX_WRITER_COUNT = min(getAvailablePhysicalProcessorCount(), 32) * 2; + private static final int DEFAULT_SCALE_WRITERS_MAX_WRITER_COUNT = min(getAvailablePhysicalProcessorCount() * 2, 64); + private static final int DEFAULT_PARTITIONED_WRITER_COUNT = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount() * 2), 2), 64); @Test public void testDefaults() @@ -66,7 +67,7 @@ public void testDefaults() .setScaleWritersEnabled(true) .setScaleWritersMaxWriterCount(DEFAULT_SCALE_WRITERS_MAX_WRITER_COUNT) .setWriterCount(1) - .setPartitionedWriterCount(DEFAULT_PROCESSOR_COUNT * 2) + .setPartitionedWriterCount(DEFAULT_PARTITIONED_WRITER_COUNT) .setTaskConcurrency(DEFAULT_PROCESSOR_COUNT) .setHttpResponseThreads(100) .setHttpTimeoutThreads(3) @@ -84,7 +85,8 @@ public void testDefaults() public void testExplicitPropertyMappings() { int processorCount = DEFAULT_PROCESSOR_COUNT == 32 ? 16 : 32; - int maxWriterCount = DEFAULT_SCALE_WRITERS_MAX_WRITER_COUNT == 32 ? 16 : 32; + int scaleWritersMaxWriterCount = DEFAULT_SCALE_WRITERS_MAX_WRITER_COUNT == 32 ? 16 : 32; + int partitionedWriterCount = DEFAULT_PARTITIONED_WRITER_COUNT == 32 ? 16 : 32; Map properties = ImmutableMap.builder() .put("experimental.thread-per-driver-scheduler-enabled", "true") .put("task.initial-splits-per-node", "1") @@ -110,9 +112,9 @@ public void testExplicitPropertyMappings() .put("driver.max-page-partitioning-buffer-size", "40MB") .put("driver.page-partitioning-buffer-pool-size", "0") .put("task.scale-writers.enabled", "false") - .put("task.scale-writers.max-writer-count", Integer.toString(maxWriterCount)) + .put("task.scale-writers.max-writer-count", Integer.toString(scaleWritersMaxWriterCount)) .put("task.writer-count", "4") - .put("task.partitioned-writer-count", Integer.toString(processorCount)) + .put("task.partitioned-writer-count", Integer.toString(partitionedWriterCount)) .put("task.concurrency", Integer.toString(processorCount)) .put("task.http-response-threads", "4") .put("task.http-timeout-threads", "10") @@ -151,9 +153,9 @@ public void testExplicitPropertyMappings() .setMaxPagePartitioningBufferSize(DataSize.of(40, Unit.MEGABYTE)) .setPagePartitioningBufferPoolSize(0) .setScaleWritersEnabled(false) - .setScaleWritersMaxWriterCount(maxWriterCount) + .setScaleWritersMaxWriterCount(scaleWritersMaxWriterCount) .setWriterCount(4) - .setPartitionedWriterCount(processorCount) + .setPartitionedWriterCount(partitionedWriterCount) .setTaskConcurrency(processorCount) .setHttpResponseThreads(4) .setHttpTimeoutThreads(10) diff --git a/docs/src/main/sphinx/admin/properties-task.md b/docs/src/main/sphinx/admin/properties-task.md index 376b7f73857c..ff109ae5ff18 100644 --- a/docs/src/main/sphinx/admin/properties-task.md +++ b/docs/src/main/sphinx/admin/properties-task.md @@ -131,7 +131,7 @@ allocates a certain amount of memory for buffering. - **Type:** {ref}`prop-type-integer` - **Restrictions:** Must be a power of two -- **Default value:** The number of physical CPUs of the node, with a minimum value of 2 and a maximum of 32 +- **Default value:** The number of physical CPUs of the node, with a minimum value of 2 and a maximum of 64 - **Session property:** `task_partitioned_writer_count` The number of concurrent writer threads per worker per query when diff --git a/docs/src/main/sphinx/admin/properties-writer-scaling.md b/docs/src/main/sphinx/admin/properties-writer-scaling.md index a2b66abaae2d..70f4d02dfb00 100644 --- a/docs/src/main/sphinx/admin/properties-writer-scaling.md +++ b/docs/src/main/sphinx/admin/properties-writer-scaling.md @@ -39,7 +39,7 @@ writing. ## `task.scale-writers.max-writer-count` - **Type:** {ref}`prop-type-integer` -- **Default value:** The number of physical CPUs of the node with a maximum of 32 +- **Default value:** The number of physical CPUs of the node with a maximum of 64 Maximum number of concurrent writers per task up to which the task can be scaled when `task.scale-writers.enabled` is set. Increasing this value may improve the