From 20fcd29ed9c0b4cd8b00d297a1271b253fe81862 Mon Sep 17 00:00:00 2001 From: Zac Blanco Date: Wed, 22 Jan 2025 17:05:30 -0800 Subject: [PATCH] [Iceberg] Session property for target split size --- .../src/main/sphinx/connector/iceberg.rst | 13 +++- .../iceberg/IcebergAbstractMetadata.java | 5 ++ .../iceberg/IcebergSessionProperties.java | 12 ++++ .../presto/iceberg/IcebergSplitManager.java | 6 +- .../presto/iceberg/IcebergSplitSource.java | 29 +++++---- .../iceberg/IcebergTableProperties.java | 11 ++++ .../facebook/presto/iceberg/IcebergUtil.java | 27 ++++++++ .../changelog/ChangelogSplitSource.java | 21 +++--- .../IcebergDistributedSmokeTestBase.java | 10 +++ .../iceberg/TestIcebergSplitManager.java | 65 +++++++++++++++++++ .../iceberg/TestIcebergSystemTables.java | 17 +++-- .../presto/iceberg/TestIcebergUtil.java | 9 +++ .../nessie/TestIcebergSystemTablesNessie.java | 14 ++-- .../TestSetTablePropertyProcedure.java | 13 ++-- .../TestIcebergSmokeRestNestedNamespace.java | 4 ++ 15 files changed, 210 insertions(+), 46 deletions(-) diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index d06ae500e6c97..87c9b1fb7d0bf 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -357,9 +357,9 @@ connector using a WITH clause: The following table properties are available, which are specific to the Presto Iceberg connector: -======================================= =============================================================== ============ +======================================= =============================================================== ========================= Property Name Description Default -======================================= =============================================================== ============ +======================================= =============================================================== ========================= ``format`` Optionally specifies the format of table data files, ``PARQUET`` either ``PARQUET`` or ``ORC``. @@ -388,7 +388,11 @@ Property Name Description ``metrics_max_inferred_column`` Optionally specifies the maximum number of columns for which ``100`` metrics are collected. -======================================= =============================================================== ============ + +``read.split.target-size`` The target size for an individual split when generating splits ``134217728`` (128MB) + for a table scan. Generated splits may still be larger or + smaller than this value. Must be specified in bytes. +======================================= =============================================================== ========================= The table definition below specifies format ``ORC``, partitioning by columns ``c1`` and ``c2``, and a file system location of ``s3://test_bucket/test_schema/test_table``: @@ -421,6 +425,9 @@ Property Name Description ``iceberg.rows_for_metadata_optimization_threshold`` Overrides the behavior of the connector property ``iceberg.rows-for-metadata-optimization-threshold`` in the current session. +``iceberg.target_split_size`` Overrides the target split size for all tables in a query in bytes. + Set to 0 to use the value in each Iceberg table's + ``read.split.target-size`` property. ===================================================== ====================================================================== Caching Support diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java index c6855be88a022..41842d2bbd26f 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java @@ -201,6 +201,7 @@ import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP; import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL; import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; import static org.apache.iceberg.TableProperties.UPDATE_MODE; public abstract class IcebergAbstractMetadata @@ -719,6 +720,7 @@ protected ImmutableMap createMetadataProperties(Table icebergTab properties.put(METADATA_PREVIOUS_VERSIONS_MAX, IcebergUtil.getMetadataPreviousVersionsMax(icebergTable)); properties.put(METADATA_DELETE_AFTER_COMMIT, IcebergUtil.isMetadataDeleteAfterCommit(icebergTable)); properties.put(METRICS_MAX_INFERRED_COLUMN, IcebergUtil.getMetricsMaxInferredColumn(icebergTable)); + properties.put(SPLIT_SIZE, IcebergUtil.getSplitSize(icebergTable)); SortOrder sortOrder = icebergTable.sortOrder(); // TODO: Support sort column transforms (https://github.com/prestodb/presto/issues/24250) @@ -1127,6 +1129,9 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta case COMMIT_RETRIES: updateProperties.set(TableProperties.COMMIT_NUM_RETRIES, String.valueOf(entry.getValue())); break; + case SPLIT_SIZE: + updateProperties.set(TableProperties.SPLIT_SIZE, entry.getValue().toString()); + break; default: throw new PrestoException(NOT_SUPPORTED, "Updating property " + entry.getKey() + " is not supported currently"); } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java index 57f954801f2f7..e82109fc0f8df 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java @@ -40,6 +40,7 @@ import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty; import static com.facebook.presto.spi.session.PropertyMetadata.doubleProperty; import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty; +import static com.facebook.presto.spi.session.PropertyMetadata.longProperty; import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty; public final class IcebergSessionProperties @@ -65,6 +66,7 @@ public final class IcebergSessionProperties public static final String STATISTIC_SNAPSHOT_RECORD_DIFFERENCE_WEIGHT = "statistic_snapshot_record_difference_weight"; public static final String ROWS_FOR_METADATA_OPTIMIZATION_THRESHOLD = "rows_for_metadata_optimization_threshold"; public static final String STATISTICS_KLL_SKETCH_K_PARAMETER = "statistics_kll_sketch_k_parameter"; + public static final String TARGET_SPLIT_SIZE = "target_split_size"; private final List> sessionProperties; @@ -189,6 +191,11 @@ public IcebergSessionProperties( .add(integerProperty(STATISTICS_KLL_SKETCH_K_PARAMETER, "The K parameter for the Apache DataSketches KLL sketch when computing histogram statistics", icebergConfig.getStatisticsKllSketchKParameter(), + false)) + .add(longProperty( + TARGET_SPLIT_SIZE, + "The target split size. Set to 0 to use the iceberg table's read.split.target-size property", + 0L, false)); nessieConfig.ifPresent((config) -> propertiesBuilder @@ -323,4 +330,9 @@ public static int getStatisticsKllSketchKParameter(ConnectorSession session) { return session.getProperty(STATISTICS_KLL_SKETCH_K_PARAMETER, Integer.class); } + + public static Long getTargetSplitSize(ConnectorSession session) + { + return session.getProperty(TARGET_SPLIT_SIZE, Long.class); + } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java index d824accafcab5..7b1415a4beced 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java @@ -31,7 +31,6 @@ import org.apache.iceberg.TableScan; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.util.SnapshotUtil; -import org.apache.iceberg.util.TableScanUtil; import org.weakref.jmx.Managed; import org.weakref.jmx.Nested; @@ -41,7 +40,6 @@ import java.util.concurrent.ThreadPoolExecutor; import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression; -import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight; import static com.facebook.presto.iceberg.IcebergTableType.CHANGELOG; import static com.facebook.presto.iceberg.IcebergTableType.EQUALITY_DELETES; import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable; @@ -95,7 +93,7 @@ public ConnectorSplitSource getSplits( IncrementalChangelogScan scan = icebergTable.newIncrementalChangelogScan() .fromSnapshotExclusive(fromSnapshot) .toSnapshot(toSnapshot); - return new ChangelogSplitSource(session, typeManager, icebergTable, scan, scan.targetSplitSize()); + return new ChangelogSplitSource(session, typeManager, icebergTable, scan); } else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) { CloseableIterable deleteFiles = IcebergUtil.getDeleteFiles(icebergTable, @@ -117,8 +115,6 @@ else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) { IcebergSplitSource splitSource = new IcebergSplitSource( session, tableScan, - TableScanUtil.splitFiles(tableScan.planFiles(), tableScan.targetSplitSize()), - getMinimumAssignedSplitWeight(session), getMetadataColumnConstraints(layoutHandle.getValidPredicate())); return splitSource; } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java index d6e97230ae6fd..bf71ba005fdc5 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java @@ -20,13 +20,13 @@ import com.facebook.presto.spi.ConnectorSplitSource; import com.facebook.presto.spi.SplitWeight; import com.facebook.presto.spi.connector.ConnectorPartitionHandle; +import com.facebook.presto.spi.schedule.NodeSelectionStrategy; import com.google.common.collect.ImmutableList; import com.google.common.io.Closer; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.TableScan; -import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import java.io.IOException; @@ -39,40 +39,45 @@ import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy; import static com.facebook.presto.iceberg.FileFormat.fromIcebergFileFormat; +import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight; import static com.facebook.presto.iceberg.IcebergUtil.getDataSequenceNumber; import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeys; +import static com.facebook.presto.iceberg.IcebergUtil.getTargetSplitSize; import static com.facebook.presto.iceberg.IcebergUtil.metadataColumnsMatchPredicates; import static com.facebook.presto.iceberg.IcebergUtil.partitionDataFromStructLike; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterators.limit; import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.iceberg.util.TableScanUtil.splitFiles; public class IcebergSplitSource implements ConnectorSplitSource { private CloseableIterator fileScanTaskIterator; - private final TableScan tableScan; private final Closer closer = Closer.create(); private final double minimumAssignedSplitWeight; - private final ConnectorSession session; + private final long targetSplitSize; + private final NodeSelectionStrategy nodeSelectionStrategy; private final TupleDomain metadataColumnConstraints; public IcebergSplitSource( ConnectorSession session, TableScan tableScan, - CloseableIterable fileScanTaskIterable, - double minimumAssignedSplitWeight, TupleDomain metadataColumnConstraints) { - this.session = requireNonNull(session, "session is null"); - this.tableScan = requireNonNull(tableScan, "tableScan is null"); - this.fileScanTaskIterator = fileScanTaskIterable.iterator(); - this.minimumAssignedSplitWeight = minimumAssignedSplitWeight; + requireNonNull(session, "session is null"); this.metadataColumnConstraints = requireNonNull(metadataColumnConstraints, "metadataColumnConstraints is null"); - closer.register(fileScanTaskIterator); + this.targetSplitSize = getTargetSplitSize(session, tableScan).toBytes(); + this.minimumAssignedSplitWeight = getMinimumAssignedSplitWeight(session); + this.nodeSelectionStrategy = getNodeSelectionStrategy(session); + this.fileScanTaskIterator = closer.register( + splitFiles( + closer.register(tableScan.planFiles()), + targetSplitSize) + .iterator()); } @Override @@ -130,8 +135,8 @@ private ConnectorSplit toIcebergSplit(FileScanTask task) getPartitionKeys(task), PartitionSpecParser.toJson(spec), partitionData.map(PartitionData::toJson), - getNodeSelectionStrategy(session), - SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0)), + nodeSelectionStrategy, + SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / targetSplitSize, minimumAssignedSplitWeight), 1.0)), task.deletes().stream().map(DeleteFile::fromIceberg).collect(toImmutableList()), Optional.empty(), getDataSequenceNumber(task.file())); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableProperties.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableProperties.java index d7e763fe462f3..1bb8976176e86 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableProperties.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableProperties.java @@ -29,6 +29,7 @@ import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType; import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty; import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty; +import static com.facebook.presto.spi.session.PropertyMetadata.longProperty; import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty; import static com.google.common.collect.ImmutableList.toImmutableList; import static java.util.Locale.ENGLISH; @@ -47,6 +48,7 @@ public class IcebergTableProperties public static final String METADATA_PREVIOUS_VERSIONS_MAX = "metadata_previous_versions_max"; public static final String METADATA_DELETE_AFTER_COMMIT = "metadata_delete_after_commit"; public static final String METRICS_MAX_INFERRED_COLUMN = "metrics_max_inferred_column"; + public static final String TARGET_SPLIT_SIZE = TableProperties.SPLIT_SIZE; private static final String DEFAULT_FORMAT_VERSION = "2"; private final List> tableProperties; @@ -133,6 +135,10 @@ public IcebergTableProperties(IcebergConfig icebergConfig) false, value -> RowLevelOperationMode.fromName((String) value), RowLevelOperationMode::modeName)) + .add(longProperty(TARGET_SPLIT_SIZE, + "Desired size of split to generate during query scan planning", + TableProperties.SPLIT_SIZE_DEFAULT, + false)) .build(); columnProperties = ImmutableList.of(stringProperty( @@ -210,4 +216,9 @@ public static RowLevelOperationMode getUpdateMode(Map tablePrope { return (RowLevelOperationMode) tableProperties.get(UPDATE_MODE); } + + public static Long getTargetSplitSize(Map tableProperties) + { + return (Long) tableProperties.get(TableProperties.SPLIT_SIZE); + } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java index 1534d77666aae..229b77d61f8f4 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java @@ -49,6 +49,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import io.airlift.units.DataSize; import org.apache.iceberg.BaseTable; import org.apache.iceberg.ContentFile; import org.apache.iceberg.ContentScanTask; @@ -61,6 +62,7 @@ import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.RowLevelOperationMode; +import org.apache.iceberg.Scan; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SortOrder; @@ -157,6 +159,7 @@ import static com.google.common.collect.Streams.stream; import static io.airlift.slice.Slices.utf8Slice; import static io.airlift.slice.Slices.wrappedBuffer; +import static io.airlift.units.DataSize.succinctBytes; import static java.lang.Double.doubleToRawLongBits; import static java.lang.Double.longBitsToDouble; import static java.lang.Double.parseDouble; @@ -195,6 +198,8 @@ import static org.apache.iceberg.TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT; import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; import static org.apache.iceberg.TableProperties.UPDATE_MODE; import static org.apache.iceberg.TableProperties.UPDATE_MODE_DEFAULT; import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL; @@ -1176,6 +1181,9 @@ public static Map populateTableProperties(ConnectorTableMetadata Integer metricsMaxInferredColumn = IcebergTableProperties.getMetricsMaxInferredColumn(tableMetadata.getProperties()); propertiesBuilder.put(METRICS_MAX_INFERRED_COLUMN_DEFAULTS, String.valueOf(metricsMaxInferredColumn)); + + propertiesBuilder.put(SPLIT_SIZE, String.valueOf(IcebergTableProperties.getTargetSplitSize(tableMetadata.getProperties()))); + return propertiesBuilder.build(); } @@ -1286,4 +1294,23 @@ public static String dataLocation(Table icebergTable) } return dataLocation; } + + public static Long getSplitSize(Table table) + { + return Long.parseLong(table.properties() + .getOrDefault(SPLIT_SIZE, + String.valueOf(SPLIT_SIZE_DEFAULT))); + } + + public static DataSize getTargetSplitSize(long sessionValueProperty, long icebergScanTargetSplitSize) + { + return sessionValueProperty == 0 ? + succinctBytes(icebergScanTargetSplitSize) : + succinctBytes(sessionValueProperty); + } + + public static DataSize getTargetSplitSize(ConnectorSession session, Scan scan) + { + return getTargetSplitSize(IcebergSessionProperties.getTargetSplitSize(session), scan.targetSplitSize()); + } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/changelog/ChangelogSplitSource.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/changelog/ChangelogSplitSource.java index ad2d8b8decade..0d585393987f6 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/changelog/ChangelogSplitSource.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/changelog/ChangelogSplitSource.java @@ -24,6 +24,7 @@ import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.SplitWeight; import com.facebook.presto.spi.connector.ConnectorPartitionHandle; +import com.facebook.presto.spi.schedule.NodeSelectionStrategy; import com.google.common.collect.ImmutableList; import com.google.common.io.Closer; import org.apache.iceberg.AddedRowsScanTask; @@ -48,9 +49,11 @@ import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT; +import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight; import static com.facebook.presto.iceberg.IcebergUtil.getColumns; import static com.facebook.presto.iceberg.IcebergUtil.getDataSequenceNumber; import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeys; +import static com.facebook.presto.iceberg.IcebergUtil.getTargetSplitSize; import static com.facebook.presto.iceberg.IcebergUtil.partitionDataFromStructLike; import static com.facebook.presto.iceberg.changelog.ChangelogOperation.fromIcebergChangelogOperation; import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; @@ -64,23 +67,23 @@ public class ChangelogSplitSource private final Closer closer = Closer.create(); private CloseableIterable fileScanTaskIterable; private CloseableIterator fileScanTaskIterator; - private final IncrementalChangelogScan tableScan; private final double minimumAssignedSplitWeight; - private final ConnectorSession session; + private final long targetSplitSize; private final List columnHandles; + private final NodeSelectionStrategy nodeSelectionStrategy; public ChangelogSplitSource( ConnectorSession session, TypeManager typeManager, Table table, - IncrementalChangelogScan tableScan, - double minimumAssignedSplitWeight) + IncrementalChangelogScan tableScan) { - this.session = requireNonNull(session, "session is null"); + requireNonNull(session, "session is null"); requireNonNull(typeManager, "typeManager is null"); this.columnHandles = getColumns(table.schema(), table.spec(), typeManager); - this.tableScan = requireNonNull(tableScan, "tableScan is null"); - this.minimumAssignedSplitWeight = minimumAssignedSplitWeight; + this.minimumAssignedSplitWeight = getMinimumAssignedSplitWeight(session); + this.targetSplitSize = getTargetSplitSize(session, tableScan).toBytes(); + this.nodeSelectionStrategy = getNodeSelectionStrategy(session); this.fileScanTaskIterable = closer.register(tableScan.planFiles()); this.fileScanTaskIterator = closer.register(fileScanTaskIterable.iterator()); } @@ -143,8 +146,8 @@ private IcebergSplit splitFromContentScanTask(ContentScanTask task, Ch getPartitionKeys(task), PartitionSpecParser.toJson(spec), partitionData.map(PartitionData::toJson), - getNodeSelectionStrategy(session), - SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0)), + nodeSelectionStrategy, + SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / targetSplitSize, minimumAssignedSplitWeight), 1.0)), ImmutableList.of(), Optional.of(new ChangelogSplitInfo(fromIcebergChangelogOperation(changeTask.operation()), changeTask.changeOrdinal(), diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java index da2c1c41a5a40..0d4c3e3c13640 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java @@ -148,6 +148,7 @@ public void testShowCreateTable() " metadata_delete_after_commit = false,\n" + " metadata_previous_versions_max = 100,\n" + " metrics_max_inferred_column = 100,\n" + + " \"read.split.target-size\" = 134217728,\n" + " \"write.update.mode\" = 'merge-on-read'\n" + ")", schemaName, getLocation(schemaName, "orders"))); } @@ -430,6 +431,7 @@ protected void testCreatePartitionedTableAs(Session session, FileFormat fileForm " metadata_previous_versions_max = 100,\n" + " metrics_max_inferred_column = 100,\n" + " partitioning = ARRAY['order_status','ship_priority','bucket(order_key, 9)'],\n" + + " \"read.split.target-size\" = 134217728,\n" + " \"write.update.mode\" = 'merge-on-read'\n" + ")", getSession().getCatalog().get(), @@ -634,6 +636,7 @@ public void testTableComments() " metadata_delete_after_commit = false,\n" + " metadata_previous_versions_max = 100,\n" + " metrics_max_inferred_column = 100,\n" + + " \"read.split.target-size\" = 134217728,\n" + " \"write.update.mode\" = 'merge-on-read'\n" + ")"; String createTableSql = format(createTableTemplate, schemaName, "test table comment", getLocation(schemaName, "test_table_comments")); @@ -727,6 +730,7 @@ private void testCreateTableLike() " metadata_previous_versions_max = 100,\n" + " metrics_max_inferred_column = 100,\n" + " partitioning = ARRAY['adate'],\n" + + " \"read.split.target-size\" = 134217728,\n" + " \"write.update.mode\" = 'merge-on-read'\n" + ")", getLocation(schemaName, "test_create_table_like_original"))); @@ -744,6 +748,7 @@ private void testCreateTableLike() " metadata_delete_after_commit = false,\n" + " metadata_previous_versions_max = 100,\n" + " metrics_max_inferred_column = 100,\n" + + " \"read.split.target-size\" = 134217728,\n" + " \"write.update.mode\" = 'merge-on-read'\n" + ")", getLocation(schemaName, "test_create_table_like_copy1"))); dropTable(session, "test_create_table_like_copy1"); @@ -757,6 +762,7 @@ private void testCreateTableLike() " metadata_delete_after_commit = false,\n" + " metadata_previous_versions_max = 100,\n" + " metrics_max_inferred_column = 100,\n" + + " \"read.split.target-size\" = 134217728,\n" + " \"write.update.mode\" = 'merge-on-read'\n" + ")", getLocation(schemaName, "test_create_table_like_copy2"))); dropTable(session, "test_create_table_like_copy2"); @@ -772,6 +778,7 @@ private void testCreateTableLike() " metadata_previous_versions_max = 100,\n" + " metrics_max_inferred_column = 100,\n" + " partitioning = ARRAY['adate'],\n" + + " \"read.split.target-size\" = 134217728,\n" + " \"write.update.mode\" = 'merge-on-read'\n" + ")", getLocation(schemaName, "test_create_table_like_original"))); @@ -787,6 +794,7 @@ private void testCreateTableLike() " metadata_previous_versions_max = 100,\n" + " metrics_max_inferred_column = 100,\n" + " partitioning = ARRAY['adate'],\n" + + " \"read.split.target-size\" = 134217728,\n" + " \"write.update.mode\" = 'merge-on-read'\n" + ")", getLocation(schemaName, "test_create_table_like_original"))); @@ -804,6 +812,7 @@ private void testCreateTableLike() " metadata_previous_versions_max = 100,\n" + " metrics_max_inferred_column = 100,\n" + " partitioning = ARRAY['adate'],\n" + + " \"read.split.target-size\" = 134217728,\n" + " \"write.update.mode\" = 'merge-on-read'\n" + ")", getLocation(schemaName, "test_create_table_like_copy5"))); @@ -852,6 +861,7 @@ protected void testCreateTableWithFormatVersion(String formatVersion, String def " metadata_delete_after_commit = false,\n" + " metadata_previous_versions_max = 100,\n" + " metrics_max_inferred_column = 100,\n" + + " \"read.split.target-size\" = 134217728,\n" + " \"write.update.mode\" = '%s'\n" + ")", getSession().getCatalog().get(), diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSplitManager.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSplitManager.java index af92ef89fad4e..73047fb4c9f00 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSplitManager.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSplitManager.java @@ -30,15 +30,19 @@ import com.facebook.presto.tests.AbstractTestQueryFramework; import com.facebook.presto.transaction.TransactionManager; import com.google.common.collect.ImmutableMap; +import org.apache.iceberg.TableProperties; import org.testng.annotations.Test; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.stream.IntStream; import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner; import static com.facebook.presto.iceberg.IcebergSessionProperties.PUSHDOWN_FILTER_ENABLED; +import static com.facebook.presto.iceberg.IcebergSessionProperties.TARGET_SPLIT_SIZE; import static com.facebook.presto.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; @@ -166,6 +170,36 @@ private void testGetSplitsByNonIdentityPartitionColumns(String tableName, boolea assertQuerySucceeds("DROP TABLE " + tableName); } + @Test + public void testSplitSchedulingWithTablePropertyAndSession() + { + Session session = Session.builder(getSession()) + .setCatalogSessionProperty("iceberg", IcebergSessionProperties.TARGET_SPLIT_SIZE, "0") + .build(); + assertQuerySucceeds("CREATE TABLE test_split_size as SELECT * FROM UNNEST(sequence(1, 512)) as t(i)"); + // verify that the session property hasn't propagated into the table + assertEquals(getQueryRunner().execute("SELECT value FROM \"test_split_size$properties\" WHERE key = 'read.split.target-size'").getOnlyValue(), + Long.toString(TableProperties.SPLIT_SIZE_DEFAULT)); + assertQuerySucceeds("ALTER TABLE test_split_size SET PROPERTIES (\"read.split.target-size\" = 1)"); + String selectQuery = "SELECT * FROM test_split_size"; + long maxSplits = getSplitsForSql(session, selectQuery); + + IntStream.range(1, 5) + .mapToObj(i -> Math.pow(2, i)) + .forEach(splitSize -> { + assertQuerySucceeds("ALTER TABLE test_split_size SET PROPERTIES (\"read.split.target-size\" =" + splitSize.intValue() + ")"); + assertEquals(getSplitsForSql(session, selectQuery), (double) maxSplits / splitSize, 5); + }); + // split size should be set to 32 on the table property. + // Set it to 1 with the session property to override the table value and verify we get the + // same number of splits as when the table value is set to 1. + Session minSplitSession = Session.builder(session) + .setCatalogSessionProperty("iceberg", TARGET_SPLIT_SIZE, "1") + .build(); + assertEquals(getSplitsForSql(minSplitSession, selectQuery), maxSplits); + assertQuerySucceeds("DROP TABLE test_split_size"); + } + private Session sessionWithFilterPushdown(boolean pushdown) { return Session.builder(getQueryRunner().getDefaultSession()) @@ -173,6 +207,37 @@ private Session sessionWithFilterPushdown(boolean pushdown) .build(); } + private long getSplitsForSql(Session session, String sql) + { + TransactionManager transactionManager = getQueryRunner().getTransactionManager(); + SplitManager splitManager = getQueryRunner().getSplitManager(); + + List tableScanNodes = getTableScanFromOptimizedPlanOfSql(sql, session); + assertNotNull(tableScanNodes); + assertEquals(tableScanNodes.size(), 1); + + TransactionId transactionId = transactionManager.beginTransaction(false); + session = session.beginTransactionId(transactionId, transactionManager, new AllowAllAccessControl()); + TableHandle tableHandle = tableScanNodes.get(0).getTable(); + TableHandle newTableHandle = new TableHandle(tableHandle.getConnectorId(), + tableHandle.getConnectorHandle(), + transactionManager.getConnectorTransaction(transactionId, tableHandle.getConnectorId()), + tableHandle.getLayout(), + tableHandle.getDynamicFilter()); + + try (SplitSource splitSource = splitManager.getSplits(session, newTableHandle, SplitSchedulingStrategy.UNGROUPED_SCHEDULING, WarningCollector.NOOP)) { + int splits = 0; + while (!splitSource.isFinished()) { + splits += splitSource.getNextBatch(NOT_PARTITIONED, Lifespan.taskWide(), 1024).get().getSplits().size(); + } + assertTrue(splitSource.isFinished()); + return splits; + } + catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } + private void validateSplitsPlannedForSql(SplitManager splitManager, TransactionManager transactionManager, boolean filterPushdown, diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSystemTables.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSystemTables.java index 271137a0e52f8..92016c4e263f9 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSystemTables.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSystemTables.java @@ -23,6 +23,7 @@ import com.facebook.presto.tests.DistributedQueryRunner; import com.facebook.presto.transaction.TransactionManager; import com.google.common.collect.ImmutableMap; +import io.airlift.units.DataSize; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -258,11 +259,11 @@ protected void checkTableProperties(String tableName, String deleteMode) { assertQuery(String.format("SHOW COLUMNS FROM test_schema.\"%s$properties\"", tableName), "VALUES ('key', 'varchar', '', '')," + "('value', 'varchar', '', '')"); - assertQuery(String.format("SELECT COUNT(*) FROM test_schema.\"%s$properties\"", tableName), "VALUES 8"); + assertQuery(String.format("SELECT COUNT(*) FROM test_schema.\"%s$properties\"", tableName), "VALUES 9"); List materializedRows = computeActual(getSession(), String.format("SELECT * FROM test_schema.\"%s$properties\"", tableName)).getMaterializedRows(); - assertThat(materializedRows).hasSize(8); + assertThat(materializedRows).hasSize(9); assertThat(materializedRows) .anySatisfy(row -> assertThat(row) .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.delete.mode", deleteMode))) @@ -279,18 +280,20 @@ protected void checkTableProperties(String tableName, String deleteMode) .anySatisfy(row -> assertThat(row) .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.delete-after-commit.enabled", "false"))) .anySatisfy(row -> assertThat(row) - .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.metrics.max-inferred-column-defaults", "100"))); + .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.metrics.max-inferred-column-defaults", "100"))) + .anySatisfy(row -> assertThat(row) + .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, IcebergTableProperties.TARGET_SPLIT_SIZE, Long.toString(DataSize.valueOf("128MB").toBytes())))); } protected void checkORCFormatTableProperties(String tableName, String deleteMode) { assertQuery(String.format("SHOW COLUMNS FROM test_schema.\"%s$properties\"", tableName), "VALUES ('key', 'varchar', '', '')," + "('value', 'varchar', '', '')"); - assertQuery(String.format("SELECT COUNT(*) FROM test_schema.\"%s$properties\"", tableName), "VALUES 9"); + assertQuery(String.format("SELECT COUNT(*) FROM test_schema.\"%s$properties\"", tableName), "VALUES 10"); List materializedRows = computeActual(getSession(), String.format("SELECT * FROM test_schema.\"%s$properties\"", tableName)).getMaterializedRows(); - assertThat(materializedRows).hasSize(9); + assertThat(materializedRows).hasSize(10); assertThat(materializedRows) .anySatisfy(row -> assertThat(row) .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.delete.mode", deleteMode))) @@ -309,7 +312,9 @@ protected void checkORCFormatTableProperties(String tableName, String deleteMode .anySatisfy(row -> assertThat(row) .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.delete-after-commit.enabled", "false"))) .anySatisfy(row -> assertThat(row) - .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.metrics.max-inferred-column-defaults", "100"))); + .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.metrics.max-inferred-column-defaults", "100"))) + .anySatisfy(row -> assertThat(row) + .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, IcebergTableProperties.TARGET_SPLIT_SIZE, Long.toString(DataSize.valueOf("128MB").toBytes())))); } @Test diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergUtil.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergUtil.java index 5c8781d3dcfe1..47c06c962a9e4 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergUtil.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergUtil.java @@ -41,9 +41,11 @@ import static com.facebook.presto.iceberg.IcebergUtil.REAL_POSITIVE_INFINITE; import static com.facebook.presto.iceberg.IcebergUtil.REAL_POSITIVE_ZERO; import static com.facebook.presto.iceberg.IcebergUtil.getAdjacentValue; +import static com.facebook.presto.iceberg.IcebergUtil.getTargetSplitSize; import static java.lang.Double.longBitsToDouble; import static java.lang.Float.intBitsToFloat; import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertEquals; public class TestIcebergUtil { @@ -368,4 +370,11 @@ public void testNextValueForOtherType() encodeScaledValue(new BigDecimal(111111111111111123.45)), false)) .isEmpty(); } + + @Test + public void testGetTargetSplitSize() + { + assertEquals(1024, getTargetSplitSize(1024, 512).toBytes()); + assertEquals(512, getTargetSplitSize(0, 512).toBytes()); + } } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSystemTablesNessie.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSystemTablesNessie.java index 5f606777596d0..e898da70777df 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSystemTablesNessie.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSystemTablesNessie.java @@ -16,6 +16,7 @@ import com.facebook.presto.Session; import com.facebook.presto.iceberg.IcebergConfig; import com.facebook.presto.iceberg.IcebergPlugin; +import com.facebook.presto.iceberg.IcebergTableProperties; import com.facebook.presto.iceberg.TestIcebergSystemTables; import com.facebook.presto.testing.MaterializedResult; import com.facebook.presto.testing.MaterializedRow; @@ -23,6 +24,7 @@ import com.facebook.presto.testing.containers.NessieContainer; import com.facebook.presto.tests.DistributedQueryRunner; import com.google.common.collect.ImmutableMap; +import io.airlift.units.DataSize; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -89,11 +91,11 @@ protected void checkTableProperties(String tableName, String deleteMode) { assertQuery(String.format("SHOW COLUMNS FROM test_schema.\"%s$properties\"", tableName), "VALUES ('key', 'varchar', '', '')," + "('value', 'varchar', '', '')"); - assertQuery(String.format("SELECT COUNT(*) FROM test_schema.\"%s$properties\"", tableName), "VALUES 10"); + assertQuery(String.format("SELECT COUNT(*) FROM test_schema.\"%s$properties\"", tableName), "VALUES 11"); List materializedRows = computeActual(getSession(), String.format("SELECT * FROM test_schema.\"%s$properties\"", tableName)).getMaterializedRows(); - assertThat(materializedRows).hasSize(10); + assertThat(materializedRows).hasSize(11); assertThat(materializedRows) .anySatisfy(row -> assertThat(row) .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.delete.mode", deleteMode))) @@ -110,7 +112,9 @@ protected void checkTableProperties(String tableName, String deleteMode) .anySatisfy(row -> assertThat(row) .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.previous-versions-max", "100"))) .anySatisfy(row -> assertThat(row) - .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.metrics.max-inferred-column-defaults", "100"))); + .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.metrics.max-inferred-column-defaults", "100"))) + .anySatisfy(row -> assertThat(row) + .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, IcebergTableProperties.TARGET_SPLIT_SIZE, Long.toString(DataSize.valueOf("128MB").toBytes())))); } @Override @@ -118,11 +122,11 @@ protected void checkORCFormatTableProperties(String tableName, String deleteMode { assertQuery(String.format("SHOW COLUMNS FROM test_schema.\"%s$properties\"", tableName), "VALUES ('key', 'varchar', '', '')," + "('value', 'varchar', '', '')"); - assertQuery(String.format("SELECT COUNT(*) FROM test_schema.\"%s$properties\"", tableName), "VALUES 11"); + assertQuery(String.format("SELECT COUNT(*) FROM test_schema.\"%s$properties\"", tableName), "VALUES 12"); List materializedRows = computeActual(getSession(), String.format("SELECT * FROM test_schema.\"%s$properties\"", tableName)).getMaterializedRows(); - assertThat(materializedRows).hasSize(11); + assertThat(materializedRows).hasSize(12); assertThat(materializedRows) .anySatisfy(row -> assertThat(row) .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.delete.mode", deleteMode))) diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestSetTablePropertyProcedure.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestSetTablePropertyProcedure.java index ef809bbcf725d..9c0b900cc89d9 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestSetTablePropertyProcedure.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestSetTablePropertyProcedure.java @@ -33,6 +33,7 @@ import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner; import static com.facebook.presto.iceberg.IcebergQueryRunner.getIcebergDataDirectoryPath; import static java.lang.String.format; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; import static org.testng.Assert.assertEquals; public class TestSetTablePropertyProcedure @@ -71,8 +72,8 @@ public void testSetTablePropertyProcedurePositionalArgs() Table table = loadTable(tableName); table.refresh(); - assertEquals(table.properties().size(), 8); - assertEquals(table.properties().get(propertyKey), null); + assertEquals(table.properties().size(), 9); + assertEquals(Long.parseLong(table.properties().get(propertyKey)), SPLIT_SIZE_DEFAULT); assertUpdate(format("CALL system.set_table_property('%s', '%s', '%s', '%s')", TEST_SCHEMA, tableName, propertyKey, propertyValue)); table.refresh(); @@ -99,8 +100,8 @@ public void testSetTablePropertyProcedureNamedArgs() Table table = loadTable(tableName); table.refresh(); - assertEquals(table.properties().size(), 8); - assertEquals(table.properties().get(propertyKey), null); + assertEquals(table.properties().size(), 9); + assertEquals(Long.parseLong(table.properties().get(propertyKey)), SPLIT_SIZE_DEFAULT); assertUpdate(format("CALL system.set_table_property(schema => '%s', key => '%s', value => '%s', table_name => '%s')", TEST_SCHEMA, propertyKey, propertyValue, tableName)); @@ -129,14 +130,14 @@ public void testSetTablePropertyProcedureUpdateExisting() Table table = loadTable(tableName); table.refresh(); - assertEquals(table.properties().size(), 8); + assertEquals(table.properties().size(), 9); assertEquals(table.properties().get(propertyKey), "4"); assertUpdate(format("CALL system.set_table_property('%s', '%s', '%s', '%s')", TEST_SCHEMA, tableName, propertyKey, propertyValue)); table.refresh(); // now the table property commit.retry.num-retries should have new value - assertEquals(table.properties().size(), 8); + assertEquals(table.properties().size(), 9); assertEquals(table.properties().get(propertyKey), propertyValue); } finally { diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRestNestedNamespace.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRestNestedNamespace.java index 6131b16b751bd..b5aa83bfe05d7 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRestNestedNamespace.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRestNestedNamespace.java @@ -181,6 +181,7 @@ public void testShowCreateTable() " metadata_delete_after_commit = false,\n" + " metadata_previous_versions_max = 100,\n" + " metrics_max_inferred_column = 100,\n" + + " \"read.split.target-size\" = 134217728,\n" + " \"write.update.mode\" = 'merge-on-read'\n" + ")", schemaName, getLocation(schemaName, "orders"))); } @@ -217,6 +218,7 @@ public void testTableComments() " metadata_delete_after_commit = false,\n" + " metadata_previous_versions_max = 100,\n" + " metrics_max_inferred_column = 100,\n" + + " \"read.split.target-size\" = 134217728,\n" + " \"write.update.mode\" = 'merge-on-read'\n" + ")"; String createTableSql = format(createTableTemplate, schemaName, "test table comment", getLocation(schemaName, "test_table_comments")); @@ -258,6 +260,7 @@ protected void testCreatePartitionedTableAs(Session session, FileFormat fileForm " metadata_previous_versions_max = 100,\n" + " metrics_max_inferred_column = 100,\n" + " partitioning = ARRAY['order_status','ship_priority','bucket(order_key, 9)'],\n" + + " \"read.split.target-size\" = 134217728,\n" + " \"write.update.mode\" = 'merge-on-read'\n" + ")", getSession().getCatalog().get(), @@ -319,6 +322,7 @@ protected void testCreateTableWithFormatVersion(String formatVersion, String def " metadata_delete_after_commit = false,\n" + " metadata_previous_versions_max = 100,\n" + " metrics_max_inferred_column = 100,\n" + + " \"read.split.target-size\" = 134217728,\n" + " \"write.update.mode\" = '%s'\n" + ")", getSession().getCatalog().get(),