Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Insert cleanups #19147

Merged
merged 5 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,15 @@ public static PartitionFunction createPartitionFunction(
IntStream.range(0, bucketCount).toArray());
}

public static int getMaxPartitionWritersBasedOnMemory(Session session)
public static int getMaxWritersBasedOnMemory(Session session)
{
return (int) ceil((double) getQueryMaxMemoryPerNode(session).toBytes() / getMaxMemoryPerPartitionWriter(session).toBytes());
}

public static int getScaleWritersMaxSkewedPartitions(Session session)
{
// Set the value of maxSkewedPartitions to scale to 60% of maximum number of writers possible per node.
return (int) (getMaxPartitionWritersBasedOnMemory(session) * 0.60);
return (int) (getMaxWritersBasedOnMemory(session) * 0.60);
}

public static int getTaskCount(PartitioningScheme partitioningScheme)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@
import static io.trino.operator.join.NestedLoopJoinOperator.NestedLoopJoinOperatorFactory;
import static io.trino.operator.output.SkewedPartitionRebalancer.checkCanScalePartitionsRemotely;
import static io.trino.operator.output.SkewedPartitionRebalancer.createPartitionFunction;
import static io.trino.operator.output.SkewedPartitionRebalancer.getMaxPartitionWritersBasedOnMemory;
import static io.trino.operator.output.SkewedPartitionRebalancer.getMaxWritersBasedOnMemory;
import static io.trino.operator.output.SkewedPartitionRebalancer.getScaleWritersMaxSkewedPartitions;
import static io.trino.operator.output.SkewedPartitionRebalancer.getTaskCount;
import static io.trino.operator.window.pattern.PhysicalValuePointer.CLASSIFIER;
Expand Down Expand Up @@ -585,10 +585,7 @@ public LocalExecutionPlan plan(
int taskCount = getTaskCount(partitioningScheme);
if (checkCanScalePartitionsRemotely(taskContext.getSession(), taskCount, partitioningScheme.getPartitioning().getHandle(), nodePartitioningManager)) {
partitionFunction = createPartitionFunction(taskContext.getSession(), nodePartitioningManager, partitioningScheme, partitionChannelTypes);
// Consider memory while calculating the number of writers. This is to avoid creating too many task buckets.
int partitionedWriterCount = min(
getTaskPartitionedWriterCount(taskContext.getSession()),
previousPowerOfTwo(getMaxPartitionWritersBasedOnMemory(taskContext.getSession())));
int partitionedWriterCount = getPartitionedWriterCountBasedOnMemory(taskContext.getSession());
// Keep the task bucket count to 50% of total local writers
int taskBucketCount = (int) ceil(0.5 * partitionedWriterCount);
skewedPartitionRebalancer = Optional.of(new SkewedPartitionRebalancer(
Expand Down Expand Up @@ -3505,19 +3502,18 @@ private int getWriterCount(Session session, WriterScalingOptions connectorScalin
return 1;
}

int maxWritersBasedOnMemory = getMaxPartitionWritersBasedOnMemory(session);
if (partitioningScheme.isPresent()) {
// The default value of partitioned writer count is 32 which is high enough to use it
// for both cases when scaling is enabled or not. Additionally, it doesn't lead to too many
// small files since when scaling is disabled only single writer will handle a single partition.
int partitionedWriterCount = getTaskWriterCount(session);
// The default value of partitioned writer count is 2 * number_of_cores (capped to 64) which is high
// enough to use it for cases with or without scaling enabled. Additionally, it doesn't lead
// to too many small files when scaling is disabled because single partition will be written by
// a single writer only.
int partitionedWriterCount = getTaskPartitionedWriterCount(session);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worth testing ?

if (isLocalScaledWriterExchange(source)) {
partitionedWriterCount = connectorScalingOptions.perTaskMaxScaledWriterCount()
.map(writerCount -> min(writerCount, getTaskPartitionedWriterCount(session)))
.orElse(getTaskPartitionedWriterCount(session));
}
// Consider memory while calculating writer count.
return min(partitionedWriterCount, previousPowerOfTwo(maxWritersBasedOnMemory));
return getPartitionedWriterCountBasedOnMemory(partitionedWriterCount, session);
}

int unpartitionedWriterCount = getTaskWriterCount(session);
Expand All @@ -3527,7 +3523,7 @@ private int getWriterCount(Session session, WriterScalingOptions connectorScalin
.orElse(getTaskScaleWritersMaxWriterCount(session));
}
// Consider memory while calculating writer count.
return min(unpartitionedWriterCount, maxWritersBasedOnMemory);
return min(unpartitionedWriterCount, getMaxWritersBasedOnMemory(session));
}

private boolean isSingleGatheringExchange(PlanNode node)
Expand Down Expand Up @@ -4111,6 +4107,16 @@ private OperatorFactory createHashAggregationOperatorFactory(
}
}

private int getPartitionedWriterCountBasedOnMemory(Session session)
{
return getPartitionedWriterCountBasedOnMemory(getTaskPartitionedWriterCount(session), session);
}

private int getPartitionedWriterCountBasedOnMemory(int partitionedWriterCount, Session session)
{
return min(partitionedWriterCount, previousPowerOfTwo(getMaxWritersBasedOnMemory(session)));
}

private static Optional<PartialAggregationController> createPartialAggregationController(Optional<DataSize> maxPartialAggregationMemorySize, AggregationNode.Step step, Session session)
{
return maxPartialAggregationMemorySize.isPresent() && step.isOutputPartial() && isAdaptivePartialAggregationEnabled(session) ?
Expand Down