Skip to content

Commit

Permalink
Fix the streamPartitioning and localProperty for table scan
Browse files Browse the repository at this point in the history
Currently we add sorted-columns to both streamPartitionColumns and localProperties
only when the bucket columns are the same as the prefix of the sort columns, there are two issues
1.the when condition is too strict and eliminates some cases where we can also expose those properties
2.adding sorted-columns as streamPartitionColumns also tighten the condition, for example
table that is bucketed by A and sorted by <A, B>; using <A, B> as the streamPartitionColumns is a more
strict rule when we should only use A instead

Instead now we:
Add bucketed-by columns to streamPartitionColumns
Add sorted-by columns to localProperty
  • Loading branch information
kewang1024 authored and rschlussel committed May 2, 2022
1 parent 547cb4d commit 2a040a2
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2820,36 +2820,33 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa
predicate = createPredicate(partitionColumns, partitions);
}

// Expose ordering property of the table.
ImmutableList.Builder<LocalProperty<ColumnHandle>> localProperties = ImmutableList.builder();
// Expose ordering property of the table when order based execution is enabled.
ImmutableList.Builder<LocalProperty<ColumnHandle>> localPropertyBuilder = ImmutableList.builder();
Optional<Set<ColumnHandle>> streamPartitionColumns = Optional.empty();
if (table.getStorage().getBucketProperty().isPresent() && !table.getStorage().getBucketProperty().get().getSortedBy().isEmpty()) {
if (table.getStorage().getBucketProperty().isPresent()
&& !table.getStorage().getBucketProperty().get().getSortedBy().isEmpty()
&& isOrderBasedExecutionEnabled(session)) {
ImmutableSet.Builder<ColumnHandle> streamPartitionColumnsBuilder = ImmutableSet.builder();
Map<String, ColumnHandle> columnHandles = hiveColumnHandles(table).stream()
.collect(toImmutableMap(HiveColumnHandle::getName, identity()));

// streamPartitioningColumns is how we partition the data across splits.
// localProperty is how we partition the data within a split.
// 1. add partition columns to streamPartitionColumns
partitionColumns.forEach(streamPartitionColumnsBuilder::add);

// 2. add sorted columns to streamPartitionColumns and localProperties
HiveBucketProperty bucketProperty = table.getStorage().getBucketProperty().get();
Map<String, ColumnHandle> columnHandles = hiveColumnHandles(table).stream()
.collect(toImmutableMap(HiveColumnHandle::getName, identity()));
bucketProperty.getSortedBy().forEach(sortingColumn -> {
ColumnHandle columnHandle = columnHandles.get(sortingColumn.getColumnName());
localProperties.add(new SortingProperty<>(columnHandle, sortingColumn.getOrder().getSortOrder()));
// 1. add partition columns and bucketed-by columns to streamPartitionColumns
// when order based execution is enabled, splitting is disabled and data is sharded across splits when table is bucketed.
partitionColumns.forEach(streamPartitionColumnsBuilder::add);
table.getStorage().getBucketProperty().get().getBucketedBy().forEach(bucketedByColumn -> {
ColumnHandle columnHandle = columnHandles.get(bucketedByColumn);
streamPartitionColumnsBuilder.add(columnHandle);
});

// We currently only set streamPartitionColumns when it enables streaming aggregation and also it's eligible to enable streaming aggregation
// 1. When the bucket columns are the same as the prefix of the sort columns
// 2. When all rows of the same value group are guaranteed to be in the same split. We disable splitting a file when isStreamingAggregationEnabled is true to make sure the property is guaranteed.
List<String> sortColumns = bucketProperty.getSortedBy().stream().map(SortingColumn::getColumnName).collect(toImmutableList());
if (bucketProperty.getBucketedBy().size() <= sortColumns.size()
&& bucketProperty.getBucketedBy().containsAll(sortColumns.subList(0, bucketProperty.getBucketedBy().size()))
&& isOrderBasedExecutionEnabled(session)) {
streamPartitionColumns = Optional.of(streamPartitionColumnsBuilder.build());
}
// 2. add sorted-by columns to localPropertyBuilder
table.getStorage().getBucketProperty().get().getSortedBy().forEach(sortingColumn -> {
ColumnHandle columnHandle = columnHandles.get(sortingColumn.getColumnName());
localPropertyBuilder.add(new SortingProperty<>(columnHandle, sortingColumn.getOrder().getSortOrder()));
});
streamPartitionColumns = Optional.of(streamPartitionColumnsBuilder.build());
}

return new ConnectorTableLayout(
Expand All @@ -2859,7 +2856,7 @@ && isOrderBasedExecutionEnabled(session)) {
tablePartitioning,
streamPartitionColumns,
discretePredicates,
localProperties.build(),
localPropertyBuilder.build(),
Optional.of(hiveLayoutHandle.getRemainingPredicate()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,51 @@ protected QueryRunner createQueryRunner()
Optional.empty());
}

@Test
public void testSortedbyKeysPrefixNotASubsetOfGroupbyKeys()
{
QueryRunner queryRunner = getQueryRunner();

try {
queryRunner.execute("CREATE TABLE test_segmented_aggregation_customer0 WITH ( \n" +
" bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" +
" sorted_by = ARRAY['name', 'custkey'], partitioned_by=array['ds'], \n" +
" format = 'DWRF' ) AS \n" +
"SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n");

// can't enable segmented aggregation
assertPlan(orderBasedExecutionEnabled(),
"SELECT custkey, count(name) FROM test_segmented_aggregation_customer0 \n" +
"WHERE ds = '2021-07-11' GROUP BY 1",
anyTree(aggregation(
singleGroupingSet("custkey"),
ImmutableMap.of(Optional.of("count"), functionCall("count", ImmutableList.of("name"))),
ImmutableList.of(), // no segmented streaming
ImmutableMap.of(),
Optional.empty(),
SINGLE,
tableScan("test_segmented_aggregation_customer0", ImmutableMap.of("custkey", "custkey", "name", "name")))));
}
finally {
queryRunner.execute("DROP TABLE IF EXISTS test_segmented_aggregation_customer0");
}
}

@Test
public void testAndSortedByKeysArePrefixOfGroupbyKeys()
{
QueryRunner queryRunner = getQueryRunner();

try {
queryRunner.execute("CREATE TABLE test_segmented_streaming_customer WITH ( \n" +
queryRunner.execute("CREATE TABLE test_segmented_aggregation_customer WITH ( \n" +
" bucket_count = 4, bucketed_by = ARRAY['custkey', 'name'], \n" +
" sorted_by = ARRAY['custkey', 'name'], partitioned_by=array['ds'], \n" +
" format = 'DWRF' ) AS \n" +
"SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n");

assertPlan(
orderBasedExecutionEnabled(),
"SELECT custkey, name, nationkey, COUNT(*) FROM test_segmented_streaming_customer \n" +
"SELECT custkey, name, nationkey, COUNT(*) FROM test_segmented_aggregation_customer \n" +
"WHERE ds = '2021-07-11' GROUP BY 1, 2, 3",
anyTree(aggregation(
singleGroupingSet("custkey", "name", "nationkey"),
Expand All @@ -72,14 +102,72 @@ public void testAndSortedByKeysArePrefixOfGroupbyKeys()
ImmutableMap.of(),
Optional.empty(),
SINGLE,
tableScan("test_segmented_streaming_customer", ImmutableMap.of("custkey", "custkey", "name", "name", "nationkey", "nationkey")))));
tableScan("test_segmented_aggregation_customer", ImmutableMap.of("custkey", "custkey", "name", "name", "nationkey", "nationkey")))));
}
finally {
queryRunner.execute("DROP TABLE IF EXISTS test_segmented_aggregation_customer");
}
}

@Test
public void testSortedByPrefixOfBucketedKeys()
{
QueryRunner queryRunner = getQueryRunner();

try {
queryRunner.execute("CREATE TABLE test_segmented_aggregation_customer2 WITH ( \n" +
" bucket_count = 4, bucketed_by = ARRAY['custkey', 'name'], \n" +
" sorted_by = ARRAY['custkey'], partitioned_by=array['ds'], \n" +
" format = 'DWRF' ) AS \n" +
"SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n");

// can enable segmented aggregation
assertPlan(orderBasedExecutionEnabled(),
"SELECT name, custkey, COUNT(*) FROM test_segmented_aggregation_customer2 \n" +
"WHERE ds = '2021-07-11' GROUP BY 1, 2",
anyTree(aggregation(
singleGroupingSet("name", "custkey"),
ImmutableMap.of(Optional.empty(), functionCall("count", ImmutableList.of())),
ImmutableList.of("custkey"), // segmented aggregation
ImmutableMap.of(),
Optional.empty(),
SINGLE,
tableScan("test_segmented_aggregation_customer2", ImmutableMap.of("name", "name", "custkey", "custkey")))));
}
finally {
queryRunner.execute("DROP TABLE IF EXISTS test_segmented_streaming_customer");
queryRunner.execute("DROP TABLE IF EXISTS test_segmented_aggregation_customer2");
}
}

//todo:add test when Group-by Keys And prefix of Sorted-by Keys share the same elemens
@Test
public void testGroupByKeysShareElementsAsSortedByKeysPrefix()
{
QueryRunner queryRunner = getQueryRunner();

try {
queryRunner.execute("CREATE TABLE test_segmented_aggregation_customer_share_elements WITH ( \n" +
" bucket_count = 4, bucketed_by = ARRAY['custkey', 'name', 'nationkey'], \n" +
" sorted_by = ARRAY['custkey', 'phone'], partitioned_by=array['ds'], \n" +
" format = 'DWRF' ) AS \n" +
"SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n");

// can enable segmented aggregation
assertPlan(orderBasedExecutionEnabled(),
"SELECT name, custkey, nationkey, COUNT(*) FROM test_segmented_aggregation_customer_share_elements \n" +
"WHERE ds = '2021-07-11' GROUP BY 1, 2, 3",
anyTree(aggregation(
singleGroupingSet("name", "custkey", "nationkey"),
ImmutableMap.of(Optional.empty(), functionCall("count", ImmutableList.of())),
ImmutableList.of("custkey"), // segmented aggregation
ImmutableMap.of(),
Optional.empty(),
SINGLE,
tableScan("test_segmented_aggregation_customer_share_elements", ImmutableMap.of("name", "name", "custkey", "custkey", "nationkey", "nationkey")))));
}
finally {
queryRunner.execute("DROP TABLE IF EXISTS test_segmented_aggregation_customer_share_elements");
}
}

private Session orderBasedExecutionEnabled()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,23 @@ public void testBucketedAndSortedByDifferentKeys()
"SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n");

// can't enable stream
// since it's bucketed by custkey, the local exchange would be removed when order based execution is enabled
assertPlan(
orderBasedExecutionEnabled(),
"SELECT custkey, COUNT(*) FROM test_customer3 \n" +
"WHERE ds = '2021-07-11' GROUP BY 1",
aggregationPlanWithNoStreaming("test_customer3", false, "custkey"));
node(
OutputNode.class,
node(
ExchangeNode.class,
aggregation(
singleGroupingSet("custkey"),
ImmutableMap.of(Optional.empty(), functionCall("count", ImmutableList.of())),
ImmutableList.of(), // non-streaming
ImmutableMap.of(),
Optional.empty(),
SINGLE,
tableScan("test_customer3", ImmutableMap.of("custkey", "custkey"))))));

// can't enable stream
assertPlan(
Expand Down Expand Up @@ -246,6 +258,41 @@ public void testGroupbySameKeysOfSortedbyKeys()
}
}

@Test
public void testGroupbySameKeysOfSortedbyKeysWithReverseOrder()
{
QueryRunner queryRunner = getQueryRunner();

try {
queryRunner.execute("CREATE TABLE test_customer6_2 WITH ( \n" +
" bucket_count = 4, bucketed_by = ARRAY['custkey', 'name'], \n" +
" sorted_by = ARRAY['custkey', 'name'], partitioned_by=array['ds'], \n" +
" format = 'DWRF' ) AS \n" +
"SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n");

// can enable streaming aggregation
assertPlan(orderBasedExecutionEnabled(),
"SELECT name, custkey, COUNT(*) FROM test_customer6_2 \n" +
"WHERE ds = '2021-07-11' GROUP BY 1, 2",
node(
OutputNode.class,
node(
ExchangeNode.class,
aggregation(
singleGroupingSet("custkey", "name"),
ImmutableMap.of(Optional.empty(), functionCall("count", ImmutableList.of())),
ImmutableList.of("custkey", "name"), // streaming
ImmutableMap.of(),
Optional.empty(),
SINGLE,
tableScan("test_customer6_2", ImmutableMap.of("custkey", "custkey", "name", "name"))))));
}
finally {
queryRunner.execute("DROP TABLE IF EXISTS test_customer6_2");
}
}

@Test
public void testGroupbyKeysNotPrefixOfSortedKeys()
{
QueryRunner queryRunner = getQueryRunner();
Expand All @@ -269,7 +316,40 @@ public void testGroupbyKeysNotPrefixOfSortedKeys()
}
}

//todo: add streaming aggregation support when grouping keys are prefix Of sorted keys
@Test
public void testGroupbyKeysPrefixOfSortedKeys()
{
QueryRunner queryRunner = getQueryRunner();

try {
queryRunner.execute("CREATE TABLE test_customer9 WITH ( \n" +
" bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" +
" sorted_by = ARRAY['custkey', 'name'], partitioned_by=array['ds'], \n" +
" format = 'DWRF' ) AS \n" +
"SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n");

// can enable streaming aggregation
assertPlan(
orderBasedExecutionEnabled(),
"SELECT custkey, COUNT(*) FROM test_customer9 \n" +
"WHERE ds = '2021-07-11' GROUP BY 1",
node(
OutputNode.class,
node(
ExchangeNode.class,
aggregation(
singleGroupingSet("custkey"),
ImmutableMap.of(Optional.empty(), functionCall("count", ImmutableList.of())),
ImmutableList.of("custkey"), // streaming
ImmutableMap.of(),
Optional.empty(),
SINGLE,
tableScan("test_customer9", ImmutableMap.of("custkey", "custkey"))))));
}
finally {
queryRunner.execute("DROP TABLE IF EXISTS test_customer9");
}
}

// Partition keys
@Test
Expand All @@ -278,25 +358,25 @@ public void testQueryingMultiplePartitions()
QueryRunner queryRunner = getQueryRunner();

try {
queryRunner.execute("CREATE TABLE test_customer9 WITH ( \n" +
queryRunner.execute("CREATE TABLE test_customer10 WITH ( \n" +
" bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" +
" sorted_by = ARRAY['custkey'], partitioned_by=array['ds'], \n" +
" format = 'DWRF' ) AS \n" +
"SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n");
queryRunner.execute("INSERT INTO test_customer9 \n" +
queryRunner.execute("INSERT INTO test_customer10 \n" +
"SELECT *, '2021-07-12' as ds FROM tpch.sf1.customer LIMIT 1000");

// can't enable streaming aggregation when querying multiple partitions without grouping by partition keys
assertPlan(
orderBasedExecutionEnabled(),
"SELECT custkey, COUNT(*) FROM test_customer9 \n" +
"SELECT custkey, COUNT(*) FROM test_customer10 \n" +
"WHERE ds = '2021-07-11' or ds = '2021-07-12' GROUP BY 1",
aggregationPlanWithNoStreaming("test_customer9", false, "custkey"));
aggregationPlanWithNoStreaming("test_customer10", false, "custkey"));

//todo: add streaming aggregation support when grouping keys contain all of the partition keys
}
finally {
queryRunner.execute("DROP TABLE IF EXISTS test_customer9");
queryRunner.execute("DROP TABLE IF EXISTS test_customer10");
}
}

Expand Down

0 comments on commit 2a040a2

Please sign in to comment.