Skip to content

Commit d42702a

Browse files
committed
[Iceberg] Session property for target split size
1 parent 6d57079 commit d42702a

File tree

6 files changed

+52
-15
lines changed

6 files changed

+52
-15
lines changed

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java

+11
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public final class IcebergSessionProperties
6565
public static final String STATISTIC_SNAPSHOT_RECORD_DIFFERENCE_WEIGHT = "statistic_snapshot_record_difference_weight";
6666
public static final String ROWS_FOR_METADATA_OPTIMIZATION_THRESHOLD = "rows_for_metadata_optimization_threshold";
6767
public static final String STATISTICS_KLL_SKETCH_K_PARAMETER = "statistics_kll_sketch_k_parameter";
68+
public static final String TARGET_SPLIT_SIZE = "target_split_size";
6869

6970
private final List<PropertyMetadata<?>> sessionProperties;
7071

@@ -189,6 +190,11 @@ public IcebergSessionProperties(
189190
.add(integerProperty(STATISTICS_KLL_SKETCH_K_PARAMETER,
190191
"The K parameter for the Apache DataSketches KLL sketch when computing histogram statistics",
191192
icebergConfig.getStatisticsKllSketchKParameter(),
193+
false))
194+
.add(dataSizeSessionProperty(
195+
TARGET_SPLIT_SIZE,
196+
"The target split size. Set to 0 to use the iceberg table's read.split.target-size property",
197+
DataSize.succinctDataSize(256, DataSize.Unit.MEGABYTE),
192198
false));
193199

194200
nessieConfig.ifPresent((config) -> propertiesBuilder
@@ -323,4 +329,9 @@ public static int getStatisticsKllSketchKParameter(ConnectorSession session)
323329
{
324330
return session.getProperty(STATISTICS_KLL_SKETCH_K_PARAMETER, Integer.class);
325331
}
332+
333+
public static DataSize getTargetSplitSize(ConnectorSession session)
334+
{
335+
return session.getProperty(TARGET_SPLIT_SIZE, DataSize.class);
336+
}
326337
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable;
4848
import static com.facebook.presto.iceberg.IcebergUtil.getMetadataColumnConstraints;
4949
import static com.facebook.presto.iceberg.IcebergUtil.getNonMetadataColumnConstraints;
50+
import static com.facebook.presto.iceberg.IcebergUtil.getTargetSplitSize;
5051
import static java.util.Objects.requireNonNull;
5152

5253
public class IcebergSplitManager
@@ -95,7 +96,7 @@ public ConnectorSplitSource getSplits(
9596
IncrementalChangelogScan scan = icebergTable.newIncrementalChangelogScan()
9697
.fromSnapshotExclusive(fromSnapshot)
9798
.toSnapshot(toSnapshot);
98-
return new ChangelogSplitSource(session, typeManager, icebergTable, scan, scan.targetSplitSize());
99+
return new ChangelogSplitSource(session, typeManager, icebergTable, scan, getTargetSplitSize(session, scan).toBytes());
99100
}
100101
else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) {
101102
CloseableIterable<DeleteFile> deleteFiles = IcebergUtil.getDeleteFiles(icebergTable,
@@ -117,7 +118,7 @@ else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) {
117118
IcebergSplitSource splitSource = new IcebergSplitSource(
118119
session,
119120
tableScan,
120-
TableScanUtil.splitFiles(tableScan.planFiles(), tableScan.targetSplitSize()),
121+
TableScanUtil.splitFiles(tableScan.planFiles(), getTargetSplitSize(session, tableScan).toBytes()),
121122
getMinimumAssignedSplitWeight(session),
122123
getMetadataColumnConstraints(layoutHandle.getValidPredicate()));
123124
return splitSource;

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import static com.facebook.presto.iceberg.FileFormat.fromIcebergFileFormat;
4242
import static com.facebook.presto.iceberg.IcebergUtil.getDataSequenceNumber;
4343
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeys;
44+
import static com.facebook.presto.iceberg.IcebergUtil.getTargetSplitSize;
4445
import static com.facebook.presto.iceberg.IcebergUtil.metadataColumnsMatchPredicates;
4546
import static com.facebook.presto.iceberg.IcebergUtil.partitionDataFromStructLike;
4647
import static com.google.common.collect.ImmutableList.toImmutableList;
@@ -131,7 +132,7 @@ private ConnectorSplit toIcebergSplit(FileScanTask task)
131132
PartitionSpecParser.toJson(spec),
132133
partitionData.map(PartitionData::toJson),
133134
getNodeSelectionStrategy(session),
134-
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0)),
135+
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / getTargetSplitSize(session, tableScan).toBytes(), minimumAssignedSplitWeight), 1.0)),
135136
task.deletes().stream().map(DeleteFile::fromIceberg).collect(toImmutableList()),
136137
Optional.empty(),
137138
getDataSequenceNumber(task.file()));

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java

+25-11
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import com.google.common.collect.ImmutableMap;
5050
import com.google.common.collect.ImmutableSet;
5151
import com.google.common.collect.Sets;
52+
import io.airlift.units.DataSize;
5253
import org.apache.iceberg.BaseTable;
5354
import org.apache.iceberg.ContentFile;
5455
import org.apache.iceberg.ContentScanTask;
@@ -61,6 +62,7 @@
6162
import org.apache.iceberg.PartitionField;
6263
import org.apache.iceberg.PartitionSpec;
6364
import org.apache.iceberg.RowLevelOperationMode;
65+
import org.apache.iceberg.Scan;
6466
import org.apache.iceberg.Schema;
6567
import org.apache.iceberg.Snapshot;
6668
import org.apache.iceberg.SortOrder;
@@ -863,10 +865,10 @@ public static long getDataSequenceNumber(ContentFile<?> file)
863865
* @param requestedSchema If provided, only delete files with this schema will be provided
864866
*/
865867
public static CloseableIterable<DeleteFile> getDeleteFiles(Table table,
866-
long snapshot,
867-
TupleDomain<IcebergColumnHandle> filter,
868-
Optional<Set<Integer>> requestedPartitionSpec,
869-
Optional<Set<Integer>> requestedSchema)
868+
long snapshot,
869+
TupleDomain<IcebergColumnHandle> filter,
870+
Optional<Set<Integer>> requestedPartitionSpec,
871+
Optional<Set<Integer>> requestedSchema)
870872
{
871873
Expression filterExpression = toIcebergExpression(filter);
872874
CloseableIterable<FileScanTask> fileTasks = table.newScan().useSnapshot(snapshot).filter(filterExpression).planFiles();
@@ -1042,9 +1044,9 @@ private static class DeleteFilesIterator
10421044
private DeleteFile currentFile;
10431045

10441046
private DeleteFilesIterator(Map<Integer, PartitionSpec> partitionSpecsById,
1045-
CloseableIterator<FileScanTask> fileTasks,
1046-
Optional<Set<Integer>> requestedPartitionSpec,
1047-
Optional<Set<Integer>> requestedSchema)
1047+
CloseableIterator<FileScanTask> fileTasks,
1048+
Optional<Set<Integer>> requestedPartitionSpec,
1049+
Optional<Set<Integer>> requestedSchema)
10481050
{
10491051
this.partitionSpecsById = partitionSpecsById;
10501052
this.fileTasks = fileTasks;
@@ -1228,8 +1230,8 @@ public static Optional<PartitionData> partitionDataFromStructLike(PartitionSpec
12281230

12291231
/**
12301232
* Get the metadata location for target {@link Table},
1231-
* considering iceberg table properties {@code WRITE_METADATA_LOCATION}
1232-
* */
1233+
* considering iceberg table properties {@code WRITE_METADATA_LOCATION}
1234+
*/
12331235
public static String metadataLocation(Table icebergTable)
12341236
{
12351237
String metadataLocation = icebergTable.properties().get(TableProperties.WRITE_METADATA_LOCATION);
@@ -1244,8 +1246,8 @@ public static String metadataLocation(Table icebergTable)
12441246

12451247
/**
12461248
* Get the data location for target {@link Table},
1247-
* considering iceberg table properties {@code WRITE_DATA_LOCATION}, {@code OBJECT_STORE_PATH} and {@code WRITE_FOLDER_STORAGE_LOCATION}
1248-
* */
1249+
* considering iceberg table properties {@code WRITE_DATA_LOCATION}, {@code OBJECT_STORE_PATH} and {@code WRITE_FOLDER_STORAGE_LOCATION}
1250+
*/
12491251
public static String dataLocation(Table icebergTable)
12501252
{
12511253
Map<String, String> properties = icebergTable.properties();
@@ -1261,4 +1263,16 @@ public static String dataLocation(Table icebergTable)
12611263
}
12621264
return dataLocation;
12631265
}
1266+
1267+
public static DataSize getTargetSplitSize(long sessionValueProperty, long icebergScanTargetSplitSize)
1268+
{
1269+
return Optional.of(DataSize.succinctBytes(sessionValueProperty))
1270+
.filter(size -> !size.equals(DataSize.succinctBytes(0)))
1271+
.orElse(DataSize.succinctBytes(icebergScanTargetSplitSize));
1272+
}
1273+
1274+
public static DataSize getTargetSplitSize(ConnectorSession session, Scan<?, ?, ?> scan)
1275+
{
1276+
return getTargetSplitSize(IcebergSessionProperties.getTargetSplitSize(session).toBytes(), scan.targetSplitSize());
1277+
}
12641278
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/changelog/ChangelogSplitSource.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.facebook.presto.iceberg.FileFormat;
1818
import com.facebook.presto.iceberg.IcebergColumnHandle;
1919
import com.facebook.presto.iceberg.IcebergSplit;
20+
import com.facebook.presto.iceberg.IcebergUtil;
2021
import com.facebook.presto.iceberg.PartitionData;
2122
import com.facebook.presto.spi.ConnectorSession;
2223
import com.facebook.presto.spi.ConnectorSplit;
@@ -144,7 +145,7 @@ private IcebergSplit splitFromContentScanTask(ContentScanTask<DataFile> task, Ch
144145
PartitionSpecParser.toJson(spec),
145146
partitionData.map(PartitionData::toJson),
146147
getNodeSelectionStrategy(session),
147-
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0)),
148+
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / IcebergUtil.getTargetSplitSize(session, tableScan).toBytes(), minimumAssignedSplitWeight), 1.0)),
148149
ImmutableList.of(),
149150
Optional.of(new ChangelogSplitInfo(fromIcebergChangelogOperation(changeTask.operation()),
150151
changeTask.changeOrdinal(),

presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergUtil.java

+9
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,11 @@
4141
import static com.facebook.presto.iceberg.IcebergUtil.REAL_POSITIVE_INFINITE;
4242
import static com.facebook.presto.iceberg.IcebergUtil.REAL_POSITIVE_ZERO;
4343
import static com.facebook.presto.iceberg.IcebergUtil.getAdjacentValue;
44+
import static com.facebook.presto.iceberg.IcebergUtil.getTargetSplitSize;
4445
import static java.lang.Double.longBitsToDouble;
4546
import static java.lang.Float.intBitsToFloat;
4647
import static org.assertj.core.api.Assertions.assertThat;
48+
import static org.testng.Assert.assertEquals;
4749

4850
public class TestIcebergUtil
4951
{
@@ -368,4 +370,11 @@ public void testNextValueForOtherType()
368370
encodeScaledValue(new BigDecimal(111111111111111123.45)), false))
369371
.isEmpty();
370372
}
373+
374+
@Test
375+
public void testGetTargetSplitSize()
376+
{
377+
assertEquals(1024, getTargetSplitSize(1024, 512).toBytes());
378+
assertEquals(512, getTargetSplitSize(0, 512).toBytes());
379+
}
371380
}

0 commit comments

Comments
 (0)