Skip to content

Commit

Permalink
[native] Support partitioned writer scaling
Browse files Browse the repository at this point in the history
  • Loading branch information
arhimondr committed Dec 20, 2024
1 parent 74c3c46 commit 0720f17
Show file tree
Hide file tree
Showing 47 changed files with 572 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@
import static com.facebook.presto.spi.MaterializedViewStatus.MaterializedViewState.NOT_MATERIALIZED;
import static com.facebook.presto.spi.MaterializedViewStatus.MaterializedViewState.PARTIALLY_MATERIALIZED;
import static com.facebook.presto.spi.MaterializedViewStatus.MaterializedViewState.TOO_MANY_PARTITIONS_MISSING;
import static com.facebook.presto.spi.PartitionedTableWritePolicy.MULTIPLE_WRITERS_PER_PARTITION_ALLOWED;
import static com.facebook.presto.spi.PartitionedTableWritePolicy.SINGLE_WRITER_PER_PARTITION_REQUIRED;
import static com.facebook.presto.spi.StandardErrorCode.ALREADY_EXISTS;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_ANALYZE_PROPERTY;
Expand Down Expand Up @@ -2995,7 +2997,7 @@ public Optional<ConnectorNewTableLayout> getInsertLayout(ConnectorSession sessio

Optional<HiveBucketHandle> hiveBucketHandle = getHiveBucketHandle(session, table);
if (!hiveBucketHandle.isPresent()) {
if (!isShufflePartitionedColumnsForTableWriteEnabled(session) || table.getPartitionColumns().isEmpty()) {
if (table.getPartitionColumns().isEmpty()) {
return Optional.empty();
}

Expand All @@ -3010,7 +3012,10 @@ public Optional<ConnectorNewTableLayout> getInsertLayout(ConnectorSession sessio
.map(Column::getName)
.collect(toList());

return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionedBy));
return Optional.of(new ConnectorNewTableLayout(
partitioningHandle,
partitionedBy,
isShufflePartitionedColumnsForTableWriteEnabled(session) ? SINGLE_WRITER_PER_PARTITION_REQUIRED : MULTIPLE_WRITERS_PER_PARTITION_ALLOWED));
}
HiveBucketProperty bucketProperty = table.getStorage().getBucketProperty()
.orElseThrow(() -> new NoSuchElementException("Bucket property should be set"));
Expand Down Expand Up @@ -3055,7 +3060,7 @@ public Optional<ConnectorNewTableLayout> getNewTableLayout(ConnectorSession sess
Optional<HiveBucketProperty> bucketProperty = getBucketProperty(tableMetadata.getProperties());
if (!bucketProperty.isPresent()) {
List<String> partitionedBy = getPartitionedBy(tableMetadata.getProperties());
if (!isShufflePartitionedColumnsForTableWriteEnabled(session) || partitionedBy.isEmpty()) {
if (partitionedBy.isEmpty()) {
return Optional.empty();
}

Expand All @@ -3074,7 +3079,10 @@ public Optional<ConnectorNewTableLayout> getNewTableLayout(ConnectorSession sess
.collect(toList()),
OptionalInt.empty());

return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionedBy));
return Optional.of(new ConnectorNewTableLayout(
partitioningHandle,
partitionedBy,
isShufflePartitionedColumnsForTableWriteEnabled(session) ? SINGLE_WRITER_PER_PARTITION_REQUIRED : MULTIPLE_WRITERS_PER_PARTITION_ALLOWED));
}
checkArgument(bucketProperty.get().getBucketFunctionType().equals(BucketFunctionType.HIVE_COMPATIBLE),
"bucketFunctionType is expected to be HIVE_COMPATIBLE, got: %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2296,6 +2296,7 @@ public void testBucketPruning()
{
Session session = getSession();
QueryRunner queryRunner = getQueryRunner();

queryRunner.execute("CREATE TABLE orders_bucketed WITH (bucket_count = 11, bucketed_by = ARRAY['orderkey']) AS " +
"SELECT * FROM orders");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ public final class SystemSessionProperties
public static final String NATIVE_EXECUTION_PROCESS_REUSE_ENABLED = "native_execution_process_reuse_enabled";
public static final String NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING = "native_min_columnar_encoding_channels_to_prefer_row_wise_encoding";
public static final String NATIVE_ENFORCE_JOIN_BUILD_INPUT_PARTITION = "native_enforce_join_build_input_partition";
public static final String NATIVE_EXECUTION_SCALE_WRITER_THREADS_ENABLED = "native_execution_scale_writer_threads_enabled";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -1841,7 +1842,11 @@ public SystemSessionProperties(
SINGLE_NODE_EXECUTION_ENABLED,
"Enable single node execution",
featuresConfig.isSingleNodeExecutionEnabled(),
false));
false),
booleanProperty(NATIVE_EXECUTION_SCALE_WRITER_THREADS_ENABLED,
"Enable automatic scaling of writer threads",
featuresConfig.isNativeExecutionScaleWritersThreadsEnabled(),
!featuresConfig.isNativeExecutionEnabled()));
}

public static boolean isSpoolingOutputBufferEnabled(Session session)
Expand Down Expand Up @@ -3132,4 +3137,9 @@ public static int getMinColumnarEncodingChannelsToPreferRowWiseEncoding(Session
{
return session.getSystemProperty(NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING, Integer.class);
}

public static boolean isNativeExecutionScaleWritersThreadsEnabled(Session session)
{
return session.getSystemProperty(NATIVE_EXECUTION_SCALE_WRITER_THREADS_ENABLED, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ public static TableFinishNode createTemporaryTableWriteWithExchanges(
outputs,
Optional.empty(),
false,
false,
COLUMNAR,
Optional.empty());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ public class FeaturesConfig

private boolean prestoSparkExecutionEnvironment;
private boolean singleNodeExecutionEnabled;
private boolean nativeExecutionScaleWritersThreadsEnabled;

public enum PartitioningPrecisionStrategy
{
Expand Down Expand Up @@ -2889,4 +2890,16 @@ public FeaturesConfig setSingleNodeExecutionEnabled(boolean singleNodeExecutionE
this.singleNodeExecutionEnabled = singleNodeExecutionEnabled;
return this;
}

public boolean isNativeExecutionScaleWritersThreadsEnabled()
{
return nativeExecutionScaleWritersThreadsEnabled;
}

@Config("native-execution-scale-writer-threads-enabled")
public FeaturesConfig setNativeExecutionScaleWritersThreadsEnabled(boolean nativeExecutionScaleWritersThreadsEnabled)
{
this.nativeExecutionScaleWritersThreadsEnabled = nativeExecutionScaleWritersThreadsEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ public PlanNode visitTableScan(TableScanNode node, RewriteContext<FragmentProper
@Override
public PlanNode visitTableWriter(TableWriterNode node, RewriteContext<FragmentProperties> context)
{
if (node.getTablePartitioningScheme().isPresent()) {
if (node.isSingleWriterPerPartitionRequired()) {
context.get().setDistribution(node.getTablePartitioningScheme().get().getPartitioning().getHandle(), metadata, session);
}
return context.defaultRewrite(node, context.get());
Expand Down Expand Up @@ -292,6 +292,7 @@ public PlanNode visitExchange(ExchangeNode exchange, RewriteContext<FragmentProp
private PlanNode createRemoteStreamingExchange(ExchangeNode exchange, RewriteContext<FragmentProperties> context)
{
checkArgument(exchange.getScope() == REMOTE_STREAMING, "Unexpected exchange scope: %s", exchange.getScope());
checkArgument(!exchange.getPartitioningScheme().isScaleWriters(), "task scaling for partitioned tables is not yet supported");

PartitioningScheme partitioningScheme = exchange.getPartitioningScheme();

Expand Down Expand Up @@ -338,6 +339,8 @@ private PlanNode createRemoteMaterializedExchange(ExchangeNode exchange, Rewrite
checkArgument(exchange.getType() == REPARTITION, "Unexpected exchange type: %s", exchange.getType());
checkArgument(exchange.getScope() == REMOTE_MATERIALIZED, "Unexpected exchange scope: %s", exchange.getScope());

checkArgument(!exchange.getPartitioningScheme().isScaleWriters(), "task scaling for partitioned tables is not yet supported");

PartitioningScheme partitioningScheme = exchange.getPartitioningScheme();

PartitioningHandle partitioningHandle = partitioningScheme.getPartitioning().getHandle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2680,7 +2680,7 @@ public PhysicalOperation visitSemiJoin(SemiJoinNode node, LocalExecutionPlanCont
public PhysicalOperation visitTableWriter(TableWriterNode node, LocalExecutionPlanContext context)
{
// Set table writer count
if (node.getTablePartitioningScheme().isPresent()) {
if (node.isSingleWriterPerPartitionRequired()) {
context.setDriverInstanceCount(getTaskPartitionedWriterCount(session));
}
else {
Expand Down Expand Up @@ -3070,6 +3070,8 @@ private PhysicalOperation createLocalMerge(ExchangeNode node, LocalExecutionPlan

private PhysicalOperation createLocalExchange(ExchangeNode node, LocalExecutionPlanContext context)
{
checkArgument(!node.getPartitioningScheme().isScaleWriters(), "thread scaling for partitioned tables is only supported by native execution");

int driverInstanceCount;
if (node.getType() == ExchangeNode.Type.GATHER) {
driverInstanceCount = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.metadata.MetadataUtil.getConnectorIdOrThrow;
import static com.facebook.presto.metadata.MetadataUtil.toSchemaTableName;
import static com.facebook.presto.spi.PartitionedTableWritePolicy.MULTIPLE_WRITERS_PER_PARTITION_ALLOWED;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.plan.AggregationNode.singleGroupingSet;
import static com.facebook.presto.spi.plan.LimitNode.Step.FINAL;
Expand Down Expand Up @@ -633,7 +634,8 @@ private static Optional<PartitioningScheme> getPartitioningSchemeForTableWrite(O

partitioningScheme = Optional.of(new PartitioningScheme(
Partitioning.create(tableLayout.get().getPartitioning(), partitionFunctionArguments),
outputLayout));
outputLayout,
tableLayout.get().getWriterPolicy() == MULTIPLE_WRITERS_PER_PARTITION_ALLOWED));
}
return partitioningScheme;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ private static SubPlan reassignPartitioningHandleIfNecessaryHelper(Metadata meta
outputPartitioningScheme.getOutputLayout(),
outputPartitioningScheme.getHashColumn(),
outputPartitioningScheme.isReplicateNullsAndAny(),
outputPartitioningScheme.isScaleWriters(),
outputPartitioningScheme.getEncoding(),
outputPartitioningScheme.getBucketToPartition()),
fragment.getStageExecutionDescriptor(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ private PlanNode pushPartial(AggregationNode aggregation, ExchangeNode exchange,
aggregationOutputs,
exchange.getPartitioningScheme().getHashColumn(),
exchange.getPartitioningScheme().isReplicateNullsAndAny(),
exchange.getPartitioningScheme().isScaleWriters(),
exchange.getPartitioningScheme().getEncoding(),
exchange.getPartitioningScheme().getBucketToPartition());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public Result apply(ProjectNode project, Captures captures, Context context)
outputBuilder.build(),
exchange.getPartitioningScheme().getHashColumn(),
exchange.getPartitioningScheme().isReplicateNullsAndAny(),
exchange.getPartitioningScheme().isScaleWriters(),
exchange.getPartitioningScheme().getEncoding(),
exchange.getPartitioningScheme().getBucketToPartition());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public Result apply(ExchangeNode node, Captures captures, Context context)
removeVariable(partitioningScheme.getOutputLayout(), assignUniqueId.getIdVariable()),
partitioningScheme.getHashColumn(),
partitioningScheme.isReplicateNullsAndAny(),
partitioningScheme.isScaleWriters(),
partitioningScheme.getEncoding(),
partitioningScheme.getBucketToPartition()),
ImmutableList.of(assignUniqueId.getSource()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public Result apply(ExchangeNode node, Captures captures, Context context)
outputLayout,
partitioningScheme.getHashColumn(),
partitioningScheme.isReplicateNullsAndAny(),
partitioningScheme.isScaleWriters(),
partitioningScheme.getEncoding(),
partitioningScheme.getBucketToPartition()),
ImmutableList.of(groupIdNode.getSource()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class PushTableWriteThroughUnion
// guaranteed regardless of this optimizer. The level of local parallelism will be
// determined by LocalExecutionPlanner separately, and shouldn't be a concern of
// this optimizer.
.matching(tableWriter -> !tableWriter.getTablePartitioningScheme().isPresent())
.matching(tableWriter -> !tableWriter.isSingleWriterPerPartitionRequired())
.with(source().matching(union().capturedAs(CHILD)));

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,13 +737,18 @@ public PlanWithProperties visitTableWriter(TableWriterNode node, PreferredProper
PlanWithProperties source = accept(node.getSource(), preferredProperties);

Optional<PartitioningScheme> shufflePartitioningScheme = node.getTablePartitioningScheme();
if (!shufflePartitioningScheme.isPresent()) {
if (!node.isSingleWriterPerPartitionRequired()) {
// prefer scale writers if single writer per partition is not required
// TODO: take into account partitioning scheme in scale writer tasks implementation
if (scaleWriters) {
shufflePartitioningScheme = Optional.of(new PartitioningScheme(Partitioning.create(SCALED_WRITER_DISTRIBUTION, ImmutableList.of()), source.getNode().getOutputVariables()));
}
else if (redistributeWrites) {
shufflePartitioningScheme = Optional.of(new PartitioningScheme(Partitioning.create(FIXED_ARBITRARY_DISTRIBUTION, ImmutableList.of()), source.getNode().getOutputVariables()));
}
else {
return rebaseAndDeriveProperties(node, source);
}
}

if (shufflePartitioningScheme.isPresent() &&
Expand Down Expand Up @@ -1118,6 +1123,7 @@ public PlanWithProperties visitSemiJoin(SemiJoinNode node, PreferredProperties p
filteringSource.getNode().getOutputVariables(),
Optional.empty(),
true,
false,
COLUMNAR,
Optional.empty())),
filteringSource.getProperties());
Expand Down Expand Up @@ -1160,6 +1166,7 @@ public PlanWithProperties visitSemiJoin(SemiJoinNode node, PreferredProperties p
filteringSource.getNode().getOutputVariables(),
Optional.empty(),
true,
false,
COLUMNAR,
Optional.empty())),
filteringSource.getProperties());
Expand Down Expand Up @@ -1336,6 +1343,7 @@ public PlanWithProperties visitUnion(UnionNode node, PreferredProperties parentP
source.getNode().getOutputVariables(),
Optional.empty(),
nullsAndAnyReplicated,
false,
COLUMNAR,
Optional.empty())),
source.getProperties());
Expand Down
Loading

0 comments on commit 0720f17

Please sign in to comment.