|
20 | 20 | import com.facebook.presto.spi.ConnectorSplitSource;
|
21 | 21 | import com.facebook.presto.spi.SplitWeight;
|
22 | 22 | import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
|
| 23 | +import com.facebook.presto.spi.schedule.NodeSelectionStrategy; |
23 | 24 | import com.google.common.collect.ImmutableList;
|
24 | 25 | import com.google.common.io.Closer;
|
25 | 26 | import org.apache.iceberg.FileScanTask;
|
26 | 27 | import org.apache.iceberg.PartitionSpec;
|
27 | 28 | import org.apache.iceberg.PartitionSpecParser;
|
28 | 29 | import org.apache.iceberg.TableScan;
|
29 |
| -import org.apache.iceberg.io.CloseableIterable; |
30 | 30 | import org.apache.iceberg.io.CloseableIterator;
|
31 | 31 |
|
32 | 32 | import java.io.IOException;
|
|
39 | 39 |
|
40 | 40 | import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy;
|
41 | 41 | import static com.facebook.presto.iceberg.FileFormat.fromIcebergFileFormat;
|
| 42 | +import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight; |
42 | 43 | import static com.facebook.presto.iceberg.IcebergUtil.getDataSequenceNumber;
|
43 | 44 | import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeys;
|
| 45 | +import static com.facebook.presto.iceberg.IcebergUtil.getTargetSplitSize; |
44 | 46 | import static com.facebook.presto.iceberg.IcebergUtil.metadataColumnsMatchPredicates;
|
45 | 47 | import static com.facebook.presto.iceberg.IcebergUtil.partitionDataFromStructLike;
|
46 | 48 | import static com.google.common.collect.ImmutableList.toImmutableList;
|
47 | 49 | import static com.google.common.collect.Iterators.limit;
|
48 | 50 | import static java.util.Objects.requireNonNull;
|
49 | 51 | import static java.util.concurrent.CompletableFuture.completedFuture;
|
| 52 | +import static org.apache.iceberg.util.TableScanUtil.splitFiles; |
50 | 53 |
|
51 | 54 | public class IcebergSplitSource
|
52 | 55 | implements ConnectorSplitSource
|
53 | 56 | {
|
54 | 57 | private CloseableIterator<FileScanTask> fileScanTaskIterator;
|
55 | 58 |
|
56 |
| - private final TableScan tableScan; |
57 | 59 | private final Closer closer = Closer.create();
|
58 | 60 | private final double minimumAssignedSplitWeight;
|
59 |
| - private final ConnectorSession session; |
| 61 | + private final long targetSplitSize; |
| 62 | + private final NodeSelectionStrategy nodeSelectionStrategy; |
60 | 63 |
|
61 | 64 | private final TupleDomain<IcebergColumnHandle> metadataColumnConstraints;
|
62 | 65 |
|
63 | 66 | public IcebergSplitSource(
|
64 | 67 | ConnectorSession session,
|
65 | 68 | TableScan tableScan,
|
66 |
| - CloseableIterable<FileScanTask> fileScanTaskIterable, |
67 |
| - double minimumAssignedSplitWeight, |
68 | 69 | TupleDomain<IcebergColumnHandle> metadataColumnConstraints)
|
69 | 70 | {
|
70 |
| - this.session = requireNonNull(session, "session is null"); |
71 |
| - this.tableScan = requireNonNull(tableScan, "tableScan is null"); |
72 |
| - this.fileScanTaskIterator = fileScanTaskIterable.iterator(); |
73 |
| - this.minimumAssignedSplitWeight = minimumAssignedSplitWeight; |
| 71 | + requireNonNull(session, "session is null"); |
74 | 72 | this.metadataColumnConstraints = requireNonNull(metadataColumnConstraints, "metadataColumnConstraints is null");
|
75 |
| - closer.register(fileScanTaskIterator); |
| 73 | + this.targetSplitSize = getTargetSplitSize(session, tableScan).toBytes(); |
| 74 | + this.minimumAssignedSplitWeight = getMinimumAssignedSplitWeight(session); |
| 75 | + this.nodeSelectionStrategy = getNodeSelectionStrategy(session); |
| 76 | + this.fileScanTaskIterator = closer.register( |
| 77 | + splitFiles( |
| 78 | + closer.register(tableScan.planFiles()), |
| 79 | + targetSplitSize) |
| 80 | + .iterator()); |
76 | 81 | }
|
77 | 82 |
|
78 | 83 | @Override
|
@@ -130,8 +135,8 @@ private ConnectorSplit toIcebergSplit(FileScanTask task)
|
130 | 135 | getPartitionKeys(task),
|
131 | 136 | PartitionSpecParser.toJson(spec),
|
132 | 137 | partitionData.map(PartitionData::toJson),
|
133 |
| - getNodeSelectionStrategy(session), |
134 |
| - SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0)), |
| 138 | + nodeSelectionStrategy, |
| 139 | + SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / targetSplitSize, minimumAssignedSplitWeight), 1.0)), |
135 | 140 | task.deletes().stream().map(DeleteFile::fromIceberg).collect(toImmutableList()),
|
136 | 141 | Optional.empty(),
|
137 | 142 | getDataSequenceNumber(task.file()));
|
|
0 commit comments