Skip to content

Commit 958ceec

Browse files
committed
[Iceberg] Session property for target split size
1 parent 6d57079 commit 958ceec

9 files changed

+159
-19
lines changed

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java

+6
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import com.google.common.collect.ImmutableSet;
6969
import com.google.common.collect.Maps;
7070
import io.airlift.slice.Slice;
71+
import io.airlift.units.DataSize;
7172
import org.apache.hadoop.fs.Path;
7273
import org.apache.iceberg.AppendFiles;
7374
import org.apache.iceberg.BaseTable;
@@ -180,6 +181,7 @@
180181
import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP;
181182
import static org.apache.iceberg.SnapshotSummary.REMOVED_EQ_DELETES_PROP;
182183
import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP;
184+
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
183185

184186
public abstract class IcebergAbstractMetadata
185187
implements ConnectorMetadata
@@ -612,6 +614,7 @@ protected ImmutableMap<String, Object> createMetadataProperties(Table icebergTab
612614
properties.put(METADATA_PREVIOUS_VERSIONS_MAX, IcebergUtil.getMetadataPreviousVersionsMax(icebergTable));
613615
properties.put(METADATA_DELETE_AFTER_COMMIT, IcebergUtil.isMetadataDeleteAfterCommit(icebergTable));
614616
properties.put(METRICS_MAX_INFERRED_COLUMN, IcebergUtil.getMetricsMaxInferredColumn(icebergTable));
617+
properties.put(SPLIT_SIZE, IcebergUtil.getSplitSize(icebergTable));
615618

616619
return properties.build();
617620
}
@@ -1014,6 +1017,9 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
10141017
case COMMIT_RETRIES:
10151018
updateProperties.set(TableProperties.COMMIT_NUM_RETRIES, String.valueOf(entry.getValue()));
10161019
break;
1020+
case SPLIT_SIZE:
1021+
updateProperties.set(TableProperties.SPLIT_SIZE, entry.getValue().toString());
1022+
break;
10171023
default:
10181024
throw new PrestoException(NOT_SUPPORTED, "Updating property " + entry.getKey() + " is not supported currently");
10191025
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java

+12
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
4141
import static com.facebook.presto.spi.session.PropertyMetadata.doubleProperty;
4242
import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty;
43+
import static com.facebook.presto.spi.session.PropertyMetadata.longProperty;
4344
import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty;
4445

4546
public final class IcebergSessionProperties
@@ -65,6 +66,7 @@ public final class IcebergSessionProperties
6566
public static final String STATISTIC_SNAPSHOT_RECORD_DIFFERENCE_WEIGHT = "statistic_snapshot_record_difference_weight";
6667
public static final String ROWS_FOR_METADATA_OPTIMIZATION_THRESHOLD = "rows_for_metadata_optimization_threshold";
6768
public static final String STATISTICS_KLL_SKETCH_K_PARAMETER = "statistics_kll_sketch_k_parameter";
69+
public static final String TARGET_SPLIT_SIZE = "target_split_size";
6870

6971
private final List<PropertyMetadata<?>> sessionProperties;
7072

@@ -189,6 +191,11 @@ public IcebergSessionProperties(
189191
.add(integerProperty(STATISTICS_KLL_SKETCH_K_PARAMETER,
190192
"The K parameter for the Apache DataSketches KLL sketch when computing histogram statistics",
191193
icebergConfig.getStatisticsKllSketchKParameter(),
194+
false))
195+
.add(longProperty(
196+
TARGET_SPLIT_SIZE,
197+
"The target split size. Set to 0 to use the iceberg table's read.split.target-size property",
198+
0L,
192199
false));
193200

194201
nessieConfig.ifPresent((config) -> propertiesBuilder
@@ -323,4 +330,9 @@ public static int getStatisticsKllSketchKParameter(ConnectorSession session)
323330
{
324331
return session.getProperty(STATISTICS_KLL_SKETCH_K_PARAMETER, Integer.class);
325332
}
333+
334+
public static Long getTargetSplitSize(ConnectorSession session)
335+
{
336+
return session.getProperty(TARGET_SPLIT_SIZE, Long.class);
337+
}
326338
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable;
4848
import static com.facebook.presto.iceberg.IcebergUtil.getMetadataColumnConstraints;
4949
import static com.facebook.presto.iceberg.IcebergUtil.getNonMetadataColumnConstraints;
50+
import static com.facebook.presto.iceberg.IcebergUtil.getTargetSplitSize;
5051
import static java.util.Objects.requireNonNull;
5152

5253
public class IcebergSplitManager
@@ -95,7 +96,7 @@ public ConnectorSplitSource getSplits(
9596
IncrementalChangelogScan scan = icebergTable.newIncrementalChangelogScan()
9697
.fromSnapshotExclusive(fromSnapshot)
9798
.toSnapshot(toSnapshot);
98-
return new ChangelogSplitSource(session, typeManager, icebergTable, scan, scan.targetSplitSize());
99+
return new ChangelogSplitSource(session, typeManager, icebergTable, scan, getTargetSplitSize(session, scan).toBytes());
99100
}
100101
else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) {
101102
CloseableIterable<DeleteFile> deleteFiles = IcebergUtil.getDeleteFiles(icebergTable,
@@ -117,7 +118,7 @@ else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) {
117118
IcebergSplitSource splitSource = new IcebergSplitSource(
118119
session,
119120
tableScan,
120-
TableScanUtil.splitFiles(tableScan.planFiles(), tableScan.targetSplitSize()),
121+
TableScanUtil.splitFiles(tableScan.planFiles(), getTargetSplitSize(session, tableScan).toBytes()),
121122
getMinimumAssignedSplitWeight(session),
122123
getMetadataColumnConstraints(layoutHandle.getValidPredicate()));
123124
return splitSource;

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import static com.facebook.presto.iceberg.FileFormat.fromIcebergFileFormat;
4242
import static com.facebook.presto.iceberg.IcebergUtil.getDataSequenceNumber;
4343
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeys;
44+
import static com.facebook.presto.iceberg.IcebergUtil.getTargetSplitSize;
4445
import static com.facebook.presto.iceberg.IcebergUtil.metadataColumnsMatchPredicates;
4546
import static com.facebook.presto.iceberg.IcebergUtil.partitionDataFromStructLike;
4647
import static com.google.common.collect.ImmutableList.toImmutableList;
@@ -131,7 +132,7 @@ private ConnectorSplit toIcebergSplit(FileScanTask task)
131132
PartitionSpecParser.toJson(spec),
132133
partitionData.map(PartitionData::toJson),
133134
getNodeSelectionStrategy(session),
134-
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0)),
135+
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / getTargetSplitSize(session, tableScan).toBytes(), minimumAssignedSplitWeight), 1.0)),
135136
task.deletes().stream().map(DeleteFile::fromIceberg).collect(toImmutableList()),
136137
Optional.empty(),
137138
getDataSequenceNumber(task.file()));

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableProperties.java

+16
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,25 @@
1616
import com.facebook.presto.common.type.ArrayType;
1717
import com.facebook.presto.spi.session.PropertyMetadata;
1818
import com.google.common.collect.ImmutableList;
19+
import io.airlift.units.DataSize;
1920
import org.apache.iceberg.RowLevelOperationMode;
2021
import org.apache.iceberg.TableProperties;
2122

2223
import javax.inject.Inject;
2324

25+
import java.lang.reflect.AccessibleObject;
26+
import java.util.Arrays;
2427
import java.util.Collection;
2528
import java.util.List;
2629
import java.util.Map;
30+
import java.util.Set;
2731

2832
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
2933
import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType;
3034
import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
35+
import static com.facebook.presto.spi.session.PropertyMetadata.dataSizeProperty;
3136
import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty;
37+
import static com.facebook.presto.spi.session.PropertyMetadata.longProperty;
3238
import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty;
3339
import static com.google.common.collect.ImmutableList.toImmutableList;
3440
import static java.util.Locale.ENGLISH;
@@ -44,6 +50,7 @@ public class IcebergTableProperties
4450
public static final String METADATA_PREVIOUS_VERSIONS_MAX = "metadata_previous_versions_max";
4551
public static final String METADATA_DELETE_AFTER_COMMIT = "metadata_delete_after_commit";
4652
public static final String METRICS_MAX_INFERRED_COLUMN = "metrics_max_inferred_column";
53+
public static final String TARGET_SPLIT_SIZE = TableProperties.SPLIT_SIZE;
4754
private static final String DEFAULT_FORMAT_VERSION = "2";
4855

4956
private final List<PropertyMetadata<?>> tableProperties;
@@ -112,6 +119,10 @@ public IcebergTableProperties(IcebergConfig icebergConfig)
112119
"The maximum number of columns for which metrics are collected",
113120
icebergConfig.getMetricsMaxInferredColumn(),
114121
false))
122+
.add(longProperty(TARGET_SPLIT_SIZE,
123+
"Desired size of split to generate during query scan planning",
124+
TableProperties.SPLIT_SIZE_DEFAULT,
125+
false))
115126
.build();
116127

117128
columnProperties = ImmutableList.of(stringProperty(
@@ -177,4 +188,9 @@ public static Integer getMetricsMaxInferredColumn(Map<String, Object> tablePrope
177188
{
178189
return (Integer) tableProperties.get(METRICS_MAX_INFERRED_COLUMN);
179190
}
191+
192+
public static Long getTargetSplitSize(Map<String, Object> tableProperties)
193+
{
194+
return (Long) tableProperties.get(TableProperties.SPLIT_SIZE);
195+
}
180196
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java

+36-11
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import com.google.common.collect.ImmutableMap;
5050
import com.google.common.collect.ImmutableSet;
5151
import com.google.common.collect.Sets;
52+
import io.airlift.units.DataSize;
5253
import org.apache.iceberg.BaseTable;
5354
import org.apache.iceberg.ContentFile;
5455
import org.apache.iceberg.ContentScanTask;
@@ -61,6 +62,7 @@
6162
import org.apache.iceberg.PartitionField;
6263
import org.apache.iceberg.PartitionSpec;
6364
import org.apache.iceberg.RowLevelOperationMode;
65+
import org.apache.iceberg.Scan;
6466
import org.apache.iceberg.Schema;
6567
import org.apache.iceberg.Snapshot;
6668
import org.apache.iceberg.SortOrder;
@@ -196,6 +198,8 @@
196198
import static org.apache.iceberg.TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT;
197199
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
198200
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
201+
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
202+
import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
199203
import static org.apache.iceberg.TableProperties.UPDATE_MODE;
200204
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
201205
import static org.apache.iceberg.types.Type.TypeID.BINARY;
@@ -863,10 +867,10 @@ public static long getDataSequenceNumber(ContentFile<?> file)
863867
* @param requestedSchema If provided, only delete files with this schema will be provided
864868
*/
865869
public static CloseableIterable<DeleteFile> getDeleteFiles(Table table,
866-
long snapshot,
867-
TupleDomain<IcebergColumnHandle> filter,
868-
Optional<Set<Integer>> requestedPartitionSpec,
869-
Optional<Set<Integer>> requestedSchema)
870+
long snapshot,
871+
TupleDomain<IcebergColumnHandle> filter,
872+
Optional<Set<Integer>> requestedPartitionSpec,
873+
Optional<Set<Integer>> requestedSchema)
870874
{
871875
Expression filterExpression = toIcebergExpression(filter);
872876
CloseableIterable<FileScanTask> fileTasks = table.newScan().useSnapshot(snapshot).filter(filterExpression).planFiles();
@@ -1042,9 +1046,9 @@ private static class DeleteFilesIterator
10421046
private DeleteFile currentFile;
10431047

10441048
private DeleteFilesIterator(Map<Integer, PartitionSpec> partitionSpecsById,
1045-
CloseableIterator<FileScanTask> fileTasks,
1046-
Optional<Set<Integer>> requestedPartitionSpec,
1047-
Optional<Set<Integer>> requestedSchema)
1049+
CloseableIterator<FileScanTask> fileTasks,
1050+
Optional<Set<Integer>> requestedPartitionSpec,
1051+
Optional<Set<Integer>> requestedSchema)
10481052
{
10491053
this.partitionSpecsById = partitionSpecsById;
10501054
this.fileTasks = fileTasks;
@@ -1158,6 +1162,9 @@ public static Map<String, String> populateTableProperties(ConnectorTableMetadata
11581162

11591163
Integer metricsMaxInferredColumn = IcebergTableProperties.getMetricsMaxInferredColumn(tableMetadata.getProperties());
11601164
propertiesBuilder.put(METRICS_MAX_INFERRED_COLUMN_DEFAULTS, String.valueOf(metricsMaxInferredColumn));
1165+
1166+
propertiesBuilder.put(SPLIT_SIZE, String.valueOf(IcebergTableProperties.getTargetSplitSize(tableMetadata.getProperties())));
1167+
11611168
return propertiesBuilder.build();
11621169
}
11631170

@@ -1228,8 +1235,8 @@ public static Optional<PartitionData> partitionDataFromStructLike(PartitionSpec
12281235

12291236
/**
12301237
* Get the metadata location for target {@link Table},
1231-
* considering iceberg table properties {@code WRITE_METADATA_LOCATION}
1232-
* */
1238+
* considering iceberg table properties {@code WRITE_METADATA_LOCATION}
1239+
*/
12331240
public static String metadataLocation(Table icebergTable)
12341241
{
12351242
String metadataLocation = icebergTable.properties().get(TableProperties.WRITE_METADATA_LOCATION);
@@ -1244,8 +1251,8 @@ public static String metadataLocation(Table icebergTable)
12441251

12451252
/**
12461253
* Get the data location for target {@link Table},
1247-
* considering iceberg table properties {@code WRITE_DATA_LOCATION}, {@code OBJECT_STORE_PATH} and {@code WRITE_FOLDER_STORAGE_LOCATION}
1248-
* */
1254+
* considering iceberg table properties {@code WRITE_DATA_LOCATION}, {@code OBJECT_STORE_PATH} and {@code WRITE_FOLDER_STORAGE_LOCATION}
1255+
*/
12491256
public static String dataLocation(Table icebergTable)
12501257
{
12511258
Map<String, String> properties = icebergTable.properties();
@@ -1261,4 +1268,22 @@ public static String dataLocation(Table icebergTable)
12611268
}
12621269
return dataLocation;
12631270
}
1271+
1272+
public static Long getSplitSize(Table table) {
1273+
return Long.parseLong(table.properties()
1274+
.getOrDefault(SPLIT_SIZE,
1275+
String.valueOf(SPLIT_SIZE_DEFAULT)));
1276+
}
1277+
1278+
public static DataSize getTargetSplitSize(long sessionValueProperty, long icebergScanTargetSplitSize)
1279+
{
1280+
return Optional.of(DataSize.succinctBytes(sessionValueProperty))
1281+
.filter(size -> !size.equals(DataSize.succinctBytes(0)))
1282+
.orElse(DataSize.succinctBytes(icebergScanTargetSplitSize));
1283+
}
1284+
1285+
public static DataSize getTargetSplitSize(ConnectorSession session, Scan<?, ?, ?> scan)
1286+
{
1287+
return getTargetSplitSize(IcebergSessionProperties.getTargetSplitSize(session), scan.targetSplitSize());
1288+
}
12641289
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/changelog/ChangelogSplitSource.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.facebook.presto.iceberg.FileFormat;
1818
import com.facebook.presto.iceberg.IcebergColumnHandle;
1919
import com.facebook.presto.iceberg.IcebergSplit;
20+
import com.facebook.presto.iceberg.IcebergUtil;
2021
import com.facebook.presto.iceberg.PartitionData;
2122
import com.facebook.presto.spi.ConnectorSession;
2223
import com.facebook.presto.spi.ConnectorSplit;
@@ -144,7 +145,7 @@ private IcebergSplit splitFromContentScanTask(ContentScanTask<DataFile> task, Ch
144145
PartitionSpecParser.toJson(spec),
145146
partitionData.map(PartitionData::toJson),
146147
getNodeSelectionStrategy(session),
147-
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0)),
148+
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / IcebergUtil.getTargetSplitSize(session, tableScan).toBytes(), minimumAssignedSplitWeight), 1.0)),
148149
ImmutableList.of(),
149150
Optional.of(new ChangelogSplitInfo(fromIcebergChangelogOperation(changeTask.operation()),
150151
changeTask.changeOrdinal(),

0 commit comments

Comments
 (0)