Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Iceberg] Add table and session property for split size #24417

Merged
merged 1 commit into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,9 @@ connector using a WITH clause:

The following table properties are available, which are specific to the Presto Iceberg connector:

======================================= =============================================================== ============
======================================= =============================================================== =========================
Property Name Description Default
======================================= =============================================================== ============
======================================= =============================================================== =========================
``format`` Optionally specifies the format of table data files, ``PARQUET``
either ``PARQUET`` or ``ORC``.

Expand Down Expand Up @@ -388,7 +388,11 @@ Property Name Description

``metrics_max_inferred_column`` Optionally specifies the maximum number of columns for which ``100``
metrics are collected.
======================================= =============================================================== ============

``read.split.target-size`` The target size for an individual split when generating splits ``134217728`` (128MB)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems Presto convention does not use the prefixes from the Iceberg lib. E.g. "write.metadata.metrics.max-inferred-column-defaults" was just named "metrics_max_inferred_column" table property. While I do think the notion with prefixes is clearer, I think it's better to name it "split_target_size" for now. Please also add explanation here this correspond to read.split.target-size table property in Iceberg library.

Later on, we can send a proposal to use full iceberg property names since it will affect the users.

Copy link
Contributor Author

@ZacBlanco ZacBlanco Feb 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hantangwangd and I have discussed this in another PR and decided that moving forward for new properties we will use the iceberg property names. We will introduce backwards-compatible property names for the properties which were already introduced and slowly phase the old names out over the next few releases. I have filed this issue to address it: #24483 It includes the relevant context

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that using Iceberg names is clearer. But ideally we should make changes for #24483 before directly using the new name. When and who will be working on it? Do you think you can send PR for that sooner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can try to get that PR out this week. I can definitely have it in soon, but since most users won't see any changes until the 292 release, I am in the camp that it would be fine to not hold up this PR to align the old property names

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! It should be good as long as it's before the 292 release.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a draft PR for introducing the deprecation of table property names: #24581

It is still WIP. Just needs some tests

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a draft PR for introducing the deprecation of table property names: #24581

It is still WIP. Just needs some tests

Great! Ping me when it's ready.

for a table scan. Generated splits may still be larger or
smaller than this value. Must be specified in bytes.
======================================= =============================================================== =========================

The table definition below specifies format ``ORC``, partitioning by columns ``c1`` and ``c2``,
and a file system location of ``s3://test_bucket/test_schema/test_table``:
Expand Down Expand Up @@ -421,6 +425,9 @@ Property Name Description
``iceberg.rows_for_metadata_optimization_threshold`` Overrides the behavior of the connector property
``iceberg.rows-for-metadata-optimization-threshold`` in the current
session.
``iceberg.target_split_size`` Overrides the target split size for all tables in a query in bytes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In hive, the counter part is MAX_SPLIT_SIZE = "max_split_size";. Can we rename this one the same as Hive? Introducing different names for the same thing would make the users confused. We could also name both as "target_split_size", and the good thing about the "target_split_size" name is that it conforms with the Iceberg library, but "hive.max_split_size" is more accurate that its split sizes cannot exceed this number. For Iceberg, it is also the max split size even though it's named as "target" split size. (See org/apache/iceberg/FixedSizeSplitScanTaskIterator.java)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg and Hive function differently. Splits are not calculated in the same way. As such, the target size property does not refer to the absolute maximum size of the split. FixedSizeSplitScanTaskIterator only applies to files which don't support offsets. Parquet and ORC do support offsets, so they most likely won't use that class, but instead the OffsetsWareSplitScanTaskIterator from the iceberg library.

Either way, these are implementation details. If the property meant "max split size" it would be documented as such. This property does not represent the max split size and does not guarantee that splits will be below this size

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. In this case, can we add some clarification here: "Unlike hive.max_split_size, this can be smaller or greater than the actual split size"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added this to the description for the table property. The session property refers to the table property, so I think it should be clear enough

Set to 0 to use the value in each Iceberg table's
``read.split.target-size`` property.
===================================================== ======================================================================

Caching Support
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@
import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
import static org.apache.iceberg.TableProperties.UPDATE_MODE;

public abstract class IcebergAbstractMetadata
Expand Down Expand Up @@ -719,6 +720,7 @@ protected ImmutableMap<String, Object> createMetadataProperties(Table icebergTab
properties.put(METADATA_PREVIOUS_VERSIONS_MAX, IcebergUtil.getMetadataPreviousVersionsMax(icebergTable));
properties.put(METADATA_DELETE_AFTER_COMMIT, IcebergUtil.isMetadataDeleteAfterCommit(icebergTable));
properties.put(METRICS_MAX_INFERRED_COLUMN, IcebergUtil.getMetricsMaxInferredColumn(icebergTable));
properties.put(SPLIT_SIZE, IcebergUtil.getSplitSize(icebergTable));

SortOrder sortOrder = icebergTable.sortOrder();
// TODO: Support sort column transforms (https://github.com/prestodb/presto/issues/24250)
Expand Down Expand Up @@ -1127,6 +1129,9 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
case COMMIT_RETRIES:
updateProperties.set(TableProperties.COMMIT_NUM_RETRIES, String.valueOf(entry.getValue()));
break;
case SPLIT_SIZE:
updateProperties.set(TableProperties.SPLIT_SIZE, entry.getValue().toString());
break;
default:
throw new PrestoException(NOT_SUPPORTED, "Updating property " + entry.getKey() + " is not supported currently");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.doubleProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.longProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty;

public final class IcebergSessionProperties
Expand All @@ -65,6 +66,7 @@ public final class IcebergSessionProperties
public static final String STATISTIC_SNAPSHOT_RECORD_DIFFERENCE_WEIGHT = "statistic_snapshot_record_difference_weight";
public static final String ROWS_FOR_METADATA_OPTIMIZATION_THRESHOLD = "rows_for_metadata_optimization_threshold";
public static final String STATISTICS_KLL_SKETCH_K_PARAMETER = "statistics_kll_sketch_k_parameter";
public static final String TARGET_SPLIT_SIZE = "target_split_size";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -189,6 +191,11 @@ public IcebergSessionProperties(
.add(integerProperty(STATISTICS_KLL_SKETCH_K_PARAMETER,
"The K parameter for the Apache DataSketches KLL sketch when computing histogram statistics",
icebergConfig.getStatisticsKllSketchKParameter(),
false))
.add(longProperty(
TARGET_SPLIT_SIZE,
"The target split size. Set to 0 to use the iceberg table's read.split.target-size property",
0L,
false));

nessieConfig.ifPresent((config) -> propertiesBuilder
Expand Down Expand Up @@ -323,4 +330,9 @@ public static int getStatisticsKllSketchKParameter(ConnectorSession session)
{
return session.getProperty(STATISTICS_KLL_SKETCH_K_PARAMETER, Integer.class);
}

public static Long getTargetSplitSize(ConnectorSession session)
{
return session.getProperty(TARGET_SPLIT_SIZE, Long.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

Expand All @@ -41,7 +40,6 @@
import java.util.concurrent.ThreadPoolExecutor;

import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
import static com.facebook.presto.iceberg.IcebergTableType.CHANGELOG;
import static com.facebook.presto.iceberg.IcebergTableType.EQUALITY_DELETES;
import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable;
Expand Down Expand Up @@ -95,7 +93,7 @@ public ConnectorSplitSource getSplits(
IncrementalChangelogScan scan = icebergTable.newIncrementalChangelogScan()
.fromSnapshotExclusive(fromSnapshot)
.toSnapshot(toSnapshot);
return new ChangelogSplitSource(session, typeManager, icebergTable, scan, scan.targetSplitSize());
return new ChangelogSplitSource(session, typeManager, icebergTable, scan);
}
else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) {
CloseableIterable<DeleteFile> deleteFiles = IcebergUtil.getDeleteFiles(icebergTable,
Expand All @@ -117,8 +115,6 @@ else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) {
IcebergSplitSource splitSource = new IcebergSplitSource(
session,
tableScan,
TableScanUtil.splitFiles(tableScan.planFiles(), tableScan.targetSplitSize()),
getMinimumAssignedSplitWeight(session),
getMetadataColumnConstraints(layoutHandle.getValidPredicate()));
return splitSource;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.SplitWeight;
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Closer;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;

import java.io.IOException;
Expand All @@ -39,40 +39,45 @@

import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy;
import static com.facebook.presto.iceberg.FileFormat.fromIcebergFileFormat;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
import static com.facebook.presto.iceberg.IcebergUtil.getDataSequenceNumber;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeys;
import static com.facebook.presto.iceberg.IcebergUtil.getTargetSplitSize;
import static com.facebook.presto.iceberg.IcebergUtil.metadataColumnsMatchPredicates;
import static com.facebook.presto.iceberg.IcebergUtil.partitionDataFromStructLike;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterators.limit;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.iceberg.util.TableScanUtil.splitFiles;

public class IcebergSplitSource
implements ConnectorSplitSource
{
private CloseableIterator<FileScanTask> fileScanTaskIterator;

private final TableScan tableScan;
private final Closer closer = Closer.create();
private final double minimumAssignedSplitWeight;
private final ConnectorSession session;
private final long targetSplitSize;
private final NodeSelectionStrategy nodeSelectionStrategy;

private final TupleDomain<IcebergColumnHandle> metadataColumnConstraints;

public IcebergSplitSource(
ConnectorSession session,
TableScan tableScan,
CloseableIterable<FileScanTask> fileScanTaskIterable,
double minimumAssignedSplitWeight,
TupleDomain<IcebergColumnHandle> metadataColumnConstraints)
{
this.session = requireNonNull(session, "session is null");
this.tableScan = requireNonNull(tableScan, "tableScan is null");
this.fileScanTaskIterator = fileScanTaskIterable.iterator();
this.minimumAssignedSplitWeight = minimumAssignedSplitWeight;
requireNonNull(session, "session is null");
this.metadataColumnConstraints = requireNonNull(metadataColumnConstraints, "metadataColumnConstraints is null");
closer.register(fileScanTaskIterator);
this.targetSplitSize = getTargetSplitSize(session, tableScan).toBytes();
this.minimumAssignedSplitWeight = getMinimumAssignedSplitWeight(session);
this.nodeSelectionStrategy = getNodeSelectionStrategy(session);
this.fileScanTaskIterator = closer.register(
splitFiles(
closer.register(tableScan.planFiles()),
targetSplitSize)
.iterator());
}

@Override
Expand Down Expand Up @@ -130,8 +135,8 @@ private ConnectorSplit toIcebergSplit(FileScanTask task)
getPartitionKeys(task),
PartitionSpecParser.toJson(spec),
partitionData.map(PartitionData::toJson),
getNodeSelectionStrategy(session),
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0)),
nodeSelectionStrategy,
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / targetSplitSize, minimumAssignedSplitWeight), 1.0)),
task.deletes().stream().map(DeleteFile::fromIceberg).collect(toImmutableList()),
Optional.empty(),
getDataSequenceNumber(task.file()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType;
import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.longProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Locale.ENGLISH;
Expand All @@ -47,6 +48,7 @@ public class IcebergTableProperties
public static final String METADATA_PREVIOUS_VERSIONS_MAX = "metadata_previous_versions_max";
public static final String METADATA_DELETE_AFTER_COMMIT = "metadata_delete_after_commit";
public static final String METRICS_MAX_INFERRED_COLUMN = "metrics_max_inferred_column";
public static final String TARGET_SPLIT_SIZE = TableProperties.SPLIT_SIZE;
private static final String DEFAULT_FORMAT_VERSION = "2";

private final List<PropertyMetadata<?>> tableProperties;
Expand Down Expand Up @@ -133,6 +135,10 @@ public IcebergTableProperties(IcebergConfig icebergConfig)
false,
value -> RowLevelOperationMode.fromName((String) value),
RowLevelOperationMode::modeName))
.add(longProperty(TARGET_SPLIT_SIZE,
"Desired size of split to generate during query scan planning",
TableProperties.SPLIT_SIZE_DEFAULT,
false))
.build();

columnProperties = ImmutableList.of(stringProperty(
Expand Down Expand Up @@ -210,4 +216,9 @@ public static RowLevelOperationMode getUpdateMode(Map<String, Object> tablePrope
{
return (RowLevelOperationMode) tableProperties.get(UPDATE_MODE);
}

public static Long getTargetSplitSize(Map<String, Object> tableProperties)
{
return (Long) tableProperties.get(TableProperties.SPLIT_SIZE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.airlift.units.DataSize;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ContentScanTask;
Expand All @@ -61,6 +62,7 @@
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Scan;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SortOrder;
Expand Down Expand Up @@ -157,6 +159,7 @@
import static com.google.common.collect.Streams.stream;
import static io.airlift.slice.Slices.utf8Slice;
import static io.airlift.slice.Slices.wrappedBuffer;
import static io.airlift.units.DataSize.succinctBytes;
import static java.lang.Double.doubleToRawLongBits;
import static java.lang.Double.longBitsToDouble;
import static java.lang.Double.parseDouble;
Expand Down Expand Up @@ -195,6 +198,8 @@
import static org.apache.iceberg.TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
import static org.apache.iceberg.TableProperties.UPDATE_MODE;
import static org.apache.iceberg.TableProperties.UPDATE_MODE_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
Expand Down Expand Up @@ -1176,6 +1181,9 @@ public static Map<String, String> populateTableProperties(ConnectorTableMetadata

Integer metricsMaxInferredColumn = IcebergTableProperties.getMetricsMaxInferredColumn(tableMetadata.getProperties());
propertiesBuilder.put(METRICS_MAX_INFERRED_COLUMN_DEFAULTS, String.valueOf(metricsMaxInferredColumn));

propertiesBuilder.put(SPLIT_SIZE, String.valueOf(IcebergTableProperties.getTargetSplitSize(tableMetadata.getProperties())));

return propertiesBuilder.build();
}

Expand Down Expand Up @@ -1286,4 +1294,23 @@ public static String dataLocation(Table icebergTable)
}
return dataLocation;
}

public static Long getSplitSize(Table table)
{
return Long.parseLong(table.properties()
.getOrDefault(SPLIT_SIZE,
String.valueOf(SPLIT_SIZE_DEFAULT)));
}

public static DataSize getTargetSplitSize(long sessionValueProperty, long icebergScanTargetSplitSize)
{
return sessionValueProperty == 0 ?
succinctBytes(icebergScanTargetSplitSize) :
succinctBytes(sessionValueProperty);
}

public static DataSize getTargetSplitSize(ConnectorSession session, Scan<?, ?, ?> scan)
{
return getTargetSplitSize(IcebergSessionProperties.getTargetSplitSize(session), scan.targetSplitSize());
}
}
Loading
Loading