Skip to content

Commit 5030328

Browse files
committed
[Iceberg] Session property for target split size
1 parent 18cef11 commit 5030328

15 files changed

+214
-53
lines changed

presto-docs/src/main/sphinx/connector/iceberg.rst

+10-3
Original file line numberDiff line numberDiff line change
@@ -357,9 +357,9 @@ connector using a WITH clause:
357357

358358
The following table properties are available, which are specific to the Presto Iceberg connector:
359359

360-
======================================= =============================================================== ============
360+
======================================= =============================================================== =========================
361361
Property Name Description Default
362-
======================================= =============================================================== ============
362+
======================================= =============================================================== =========================
363363
``format`` Optionally specifies the format of table data files, ``PARQUET``
364364
either ``PARQUET`` or ``ORC``.
365365

@@ -388,7 +388,11 @@ Property Name Description
388388

389389
``metrics_max_inferred_column`` Optionally specifies the maximum number of columns for which ``100``
390390
metrics are collected.
391-
======================================= =============================================================== ============
391+
392+
``read.split.target-size`` The target size for an individual split when generating splits ``134217728`` (128MB)
393+
for a table scan. Generated splits may still be larger or
394+
smaller than this value. Must be specified in bytes.
395+
======================================= =============================================================== =========================
392396

393397
The table definition below specifies format ``ORC``, partitioning by columns ``c1`` and ``c2``,
394398
and a file system location of ``s3://test_bucket/test_schema/test_table``:
@@ -421,6 +425,9 @@ Property Name Description
421425
``iceberg.rows_for_metadata_optimization_threshold`` Overrides the behavior of the connector property
422426
``iceberg.rows-for-metadata-optimization-threshold`` in the current
423427
session.
428+
``iceberg.target_split_size`` Overrides the target split size for all tables in a query in bytes.
429+
Set to 0 to use the value in each Iceberg table's
430+
``read.split.target-size`` property.
424431
===================================================== ======================================================================
425432

426433
Caching Support

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java

+5
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@
201201
import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP;
202202
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
203203
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT;
204+
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
204205
import static org.apache.iceberg.TableProperties.UPDATE_MODE;
205206

206207
public abstract class IcebergAbstractMetadata
@@ -719,6 +720,7 @@ protected ImmutableMap<String, Object> createMetadataProperties(Table icebergTab
719720
properties.put(METADATA_PREVIOUS_VERSIONS_MAX, IcebergUtil.getMetadataPreviousVersionsMax(icebergTable));
720721
properties.put(METADATA_DELETE_AFTER_COMMIT, IcebergUtil.isMetadataDeleteAfterCommit(icebergTable));
721722
properties.put(METRICS_MAX_INFERRED_COLUMN, IcebergUtil.getMetricsMaxInferredColumn(icebergTable));
723+
properties.put(SPLIT_SIZE, IcebergUtil.getSplitSize(icebergTable));
722724

723725
SortOrder sortOrder = icebergTable.sortOrder();
724726
// TODO: Support sort column transforms (https://github.com/prestodb/presto/issues/24250)
@@ -1127,6 +1129,9 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
11271129
case COMMIT_RETRIES:
11281130
updateProperties.set(TableProperties.COMMIT_NUM_RETRIES, String.valueOf(entry.getValue()));
11291131
break;
1132+
case SPLIT_SIZE:
1133+
updateProperties.set(TableProperties.SPLIT_SIZE, entry.getValue().toString());
1134+
break;
11301135
default:
11311136
throw new PrestoException(NOT_SUPPORTED, "Updating property " + entry.getKey() + " is not supported currently");
11321137
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java

+12
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
4141
import static com.facebook.presto.spi.session.PropertyMetadata.doubleProperty;
4242
import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty;
43+
import static com.facebook.presto.spi.session.PropertyMetadata.longProperty;
4344
import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty;
4445

4546
public final class IcebergSessionProperties
@@ -65,6 +66,7 @@ public final class IcebergSessionProperties
6566
public static final String STATISTIC_SNAPSHOT_RECORD_DIFFERENCE_WEIGHT = "statistic_snapshot_record_difference_weight";
6667
public static final String ROWS_FOR_METADATA_OPTIMIZATION_THRESHOLD = "rows_for_metadata_optimization_threshold";
6768
public static final String STATISTICS_KLL_SKETCH_K_PARAMETER = "statistics_kll_sketch_k_parameter";
69+
public static final String TARGET_SPLIT_SIZE = "target_split_size";
6870

6971
private final List<PropertyMetadata<?>> sessionProperties;
7072

@@ -189,6 +191,11 @@ public IcebergSessionProperties(
189191
.add(integerProperty(STATISTICS_KLL_SKETCH_K_PARAMETER,
190192
"The K parameter for the Apache DataSketches KLL sketch when computing histogram statistics",
191193
icebergConfig.getStatisticsKllSketchKParameter(),
194+
false))
195+
.add(longProperty(
196+
TARGET_SPLIT_SIZE,
197+
"The target split size. Set to 0 to use the iceberg table's read.split.target-size property",
198+
0L,
192199
false));
193200

194201
nessieConfig.ifPresent((config) -> propertiesBuilder
@@ -323,4 +330,9 @@ public static int getStatisticsKllSketchKParameter(ConnectorSession session)
323330
{
324331
return session.getProperty(STATISTICS_KLL_SKETCH_K_PARAMETER, Integer.class);
325332
}
333+
334+
public static Long getTargetSplitSize(ConnectorSession session)
335+
{
336+
return session.getProperty(TARGET_SPLIT_SIZE, Long.class);
337+
}
326338
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.iceberg.TableScan;
3232
import org.apache.iceberg.io.CloseableIterable;
3333
import org.apache.iceberg.util.SnapshotUtil;
34-
import org.apache.iceberg.util.TableScanUtil;
3534
import org.weakref.jmx.Managed;
3635
import org.weakref.jmx.Nested;
3736

@@ -41,7 +40,6 @@
4140
import java.util.concurrent.ThreadPoolExecutor;
4241

4342
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
44-
import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
4543
import static com.facebook.presto.iceberg.IcebergTableType.CHANGELOG;
4644
import static com.facebook.presto.iceberg.IcebergTableType.EQUALITY_DELETES;
4745
import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable;
@@ -95,7 +93,7 @@ public ConnectorSplitSource getSplits(
9593
IncrementalChangelogScan scan = icebergTable.newIncrementalChangelogScan()
9694
.fromSnapshotExclusive(fromSnapshot)
9795
.toSnapshot(toSnapshot);
98-
return new ChangelogSplitSource(session, typeManager, icebergTable, scan, scan.targetSplitSize());
96+
return new ChangelogSplitSource(session, typeManager, icebergTable, scan);
9997
}
10098
else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) {
10199
CloseableIterable<DeleteFile> deleteFiles = IcebergUtil.getDeleteFiles(icebergTable,
@@ -117,8 +115,6 @@ else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) {
117115
IcebergSplitSource splitSource = new IcebergSplitSource(
118116
session,
119117
tableScan,
120-
TableScanUtil.splitFiles(tableScan.planFiles(), tableScan.targetSplitSize()),
121-
getMinimumAssignedSplitWeight(session),
122118
getMetadataColumnConstraints(layoutHandle.getValidPredicate()));
123119
return splitSource;
124120
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java

+17-12
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@
2020
import com.facebook.presto.spi.ConnectorSplitSource;
2121
import com.facebook.presto.spi.SplitWeight;
2222
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
23+
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
2324
import com.google.common.collect.ImmutableList;
2425
import com.google.common.io.Closer;
2526
import org.apache.iceberg.FileScanTask;
2627
import org.apache.iceberg.PartitionSpec;
2728
import org.apache.iceberg.PartitionSpecParser;
2829
import org.apache.iceberg.TableScan;
29-
import org.apache.iceberg.io.CloseableIterable;
3030
import org.apache.iceberg.io.CloseableIterator;
3131

3232
import java.io.IOException;
@@ -39,40 +39,45 @@
3939

4040
import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy;
4141
import static com.facebook.presto.iceberg.FileFormat.fromIcebergFileFormat;
42+
import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
4243
import static com.facebook.presto.iceberg.IcebergUtil.getDataSequenceNumber;
4344
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeys;
45+
import static com.facebook.presto.iceberg.IcebergUtil.getTargetSplitSize;
4446
import static com.facebook.presto.iceberg.IcebergUtil.metadataColumnsMatchPredicates;
4547
import static com.facebook.presto.iceberg.IcebergUtil.partitionDataFromStructLike;
4648
import static com.google.common.collect.ImmutableList.toImmutableList;
4749
import static com.google.common.collect.Iterators.limit;
4850
import static java.util.Objects.requireNonNull;
4951
import static java.util.concurrent.CompletableFuture.completedFuture;
52+
import static org.apache.iceberg.util.TableScanUtil.splitFiles;
5053

5154
public class IcebergSplitSource
5255
implements ConnectorSplitSource
5356
{
5457
private CloseableIterator<FileScanTask> fileScanTaskIterator;
5558

56-
private final TableScan tableScan;
5759
private final Closer closer = Closer.create();
5860
private final double minimumAssignedSplitWeight;
59-
private final ConnectorSession session;
61+
private final long targetSplitSize;
62+
private final NodeSelectionStrategy nodeSelectionStrategy;
6063

6164
private final TupleDomain<IcebergColumnHandle> metadataColumnConstraints;
6265

6366
public IcebergSplitSource(
6467
ConnectorSession session,
6568
TableScan tableScan,
66-
CloseableIterable<FileScanTask> fileScanTaskIterable,
67-
double minimumAssignedSplitWeight,
6869
TupleDomain<IcebergColumnHandle> metadataColumnConstraints)
6970
{
70-
this.session = requireNonNull(session, "session is null");
71-
this.tableScan = requireNonNull(tableScan, "tableScan is null");
72-
this.fileScanTaskIterator = fileScanTaskIterable.iterator();
73-
this.minimumAssignedSplitWeight = minimumAssignedSplitWeight;
71+
requireNonNull(session, "session is null");
7472
this.metadataColumnConstraints = requireNonNull(metadataColumnConstraints, "metadataColumnConstraints is null");
75-
closer.register(fileScanTaskIterator);
73+
this.targetSplitSize = getTargetSplitSize(session, tableScan).toBytes();
74+
this.minimumAssignedSplitWeight = getMinimumAssignedSplitWeight(session);
75+
this.nodeSelectionStrategy = getNodeSelectionStrategy(session);
76+
this.fileScanTaskIterator = closer.register(
77+
splitFiles(
78+
closer.register(tableScan.planFiles()),
79+
targetSplitSize)
80+
.iterator());
7681
}
7782

7883
@Override
@@ -130,8 +135,8 @@ private ConnectorSplit toIcebergSplit(FileScanTask task)
130135
getPartitionKeys(task),
131136
PartitionSpecParser.toJson(spec),
132137
partitionData.map(PartitionData::toJson),
133-
getNodeSelectionStrategy(session),
134-
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0)),
138+
nodeSelectionStrategy,
139+
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / targetSplitSize, minimumAssignedSplitWeight), 1.0)),
135140
task.deletes().stream().map(DeleteFile::fromIceberg).collect(toImmutableList()),
136141
Optional.empty(),
137142
getDataSequenceNumber(task.file()));

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableProperties.java

+11
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType;
3030
import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
3131
import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty;
32+
import static com.facebook.presto.spi.session.PropertyMetadata.longProperty;
3233
import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty;
3334
import static com.google.common.collect.ImmutableList.toImmutableList;
3435
import static java.util.Locale.ENGLISH;
@@ -47,6 +48,7 @@ public class IcebergTableProperties
4748
public static final String METADATA_PREVIOUS_VERSIONS_MAX = "metadata_previous_versions_max";
4849
public static final String METADATA_DELETE_AFTER_COMMIT = "metadata_delete_after_commit";
4950
public static final String METRICS_MAX_INFERRED_COLUMN = "metrics_max_inferred_column";
51+
public static final String TARGET_SPLIT_SIZE = TableProperties.SPLIT_SIZE;
5052
private static final String DEFAULT_FORMAT_VERSION = "2";
5153

5254
private final List<PropertyMetadata<?>> tableProperties;
@@ -133,6 +135,10 @@ public IcebergTableProperties(IcebergConfig icebergConfig)
133135
false,
134136
value -> RowLevelOperationMode.fromName((String) value),
135137
RowLevelOperationMode::modeName))
138+
.add(longProperty(TARGET_SPLIT_SIZE,
139+
"Desired size of split to generate during query scan planning",
140+
TableProperties.SPLIT_SIZE_DEFAULT,
141+
false))
136142
.build();
137143

138144
columnProperties = ImmutableList.of(stringProperty(
@@ -210,4 +216,9 @@ public static RowLevelOperationMode getUpdateMode(Map<String, Object> tablePrope
210216
{
211217
return (RowLevelOperationMode) tableProperties.get(UPDATE_MODE);
212218
}
219+
220+
public static Long getTargetSplitSize(Map<String, Object> tableProperties)
221+
{
222+
return (Long) tableProperties.get(TableProperties.SPLIT_SIZE);
223+
}
213224
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java

+27
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import com.google.common.collect.ImmutableMap;
5050
import com.google.common.collect.ImmutableSet;
5151
import com.google.common.collect.Sets;
52+
import io.airlift.units.DataSize;
5253
import org.apache.iceberg.BaseTable;
5354
import org.apache.iceberg.ContentFile;
5455
import org.apache.iceberg.ContentScanTask;
@@ -61,6 +62,7 @@
6162
import org.apache.iceberg.PartitionField;
6263
import org.apache.iceberg.PartitionSpec;
6364
import org.apache.iceberg.RowLevelOperationMode;
65+
import org.apache.iceberg.Scan;
6466
import org.apache.iceberg.Schema;
6567
import org.apache.iceberg.Snapshot;
6668
import org.apache.iceberg.SortOrder;
@@ -157,6 +159,7 @@
157159
import static com.google.common.collect.Streams.stream;
158160
import static io.airlift.slice.Slices.utf8Slice;
159161
import static io.airlift.slice.Slices.wrappedBuffer;
162+
import static io.airlift.units.DataSize.succinctBytes;
160163
import static java.lang.Double.doubleToRawLongBits;
161164
import static java.lang.Double.longBitsToDouble;
162165
import static java.lang.Double.parseDouble;
@@ -195,6 +198,8 @@
195198
import static org.apache.iceberg.TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT;
196199
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
197200
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
201+
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
202+
import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
198203
import static org.apache.iceberg.TableProperties.UPDATE_MODE;
199204
import static org.apache.iceberg.TableProperties.UPDATE_MODE_DEFAULT;
200205
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
@@ -1176,6 +1181,9 @@ public static Map<String, String> populateTableProperties(ConnectorTableMetadata
11761181

11771182
Integer metricsMaxInferredColumn = IcebergTableProperties.getMetricsMaxInferredColumn(tableMetadata.getProperties());
11781183
propertiesBuilder.put(METRICS_MAX_INFERRED_COLUMN_DEFAULTS, String.valueOf(metricsMaxInferredColumn));
1184+
1185+
propertiesBuilder.put(SPLIT_SIZE, String.valueOf(IcebergTableProperties.getTargetSplitSize(tableMetadata.getProperties())));
1186+
11791187
return propertiesBuilder.build();
11801188
}
11811189

@@ -1286,4 +1294,23 @@ public static String dataLocation(Table icebergTable)
12861294
}
12871295
return dataLocation;
12881296
}
1297+
1298+
public static Long getSplitSize(Table table)
1299+
{
1300+
return Long.parseLong(table.properties()
1301+
.getOrDefault(SPLIT_SIZE,
1302+
String.valueOf(SPLIT_SIZE_DEFAULT)));
1303+
}
1304+
1305+
public static DataSize getTargetSplitSize(long sessionValueProperty, long icebergScanTargetSplitSize)
1306+
{
1307+
return sessionValueProperty == 0 ?
1308+
succinctBytes(icebergScanTargetSplitSize) :
1309+
succinctBytes(sessionValueProperty);
1310+
}
1311+
1312+
public static DataSize getTargetSplitSize(ConnectorSession session, Scan<?, ?, ?> scan)
1313+
{
1314+
return getTargetSplitSize(IcebergSessionProperties.getTargetSplitSize(session), scan.targetSplitSize());
1315+
}
12891316
}

0 commit comments

Comments
 (0)