diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index 774747ea60cdd..d06ae500e6c97 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -1414,6 +1414,30 @@ For example, ``DESCRIBE`` from the partitioned Iceberg table ``customer``: comment | varchar | | (3 rows) +UPDATE +^^^^^^ + +The Iceberg connector supports :doc:`../sql/update` operations on Iceberg +tables. Only some tables support updates. These tables must be at minimum format +version 2, and the ``write.update.mode`` must be set to `merge-on-read`. + +.. code-block:: sql + + UPDATE region SET name = 'EU', comment = 'Europe' WHERE regionkey = 1; + +.. code-block:: text + + UPDATE: 1 row + + Query 20250204_010341_00021_ymwi5, FINISHED, 2 nodes + +The query returns an error if the table does not meet the requirements for +updates. + +.. code-block:: text + + Query 20250204_010445_00022_ymwi5 failed: Iceberg table updates require at least format version 2 and update mode must be merge-on-read + Schema Evolution ---------------- diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ColumnIdentity.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ColumnIdentity.java index ee1f2998488a3..b5bf5a9eca9fe 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ColumnIdentity.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ColumnIdentity.java @@ -103,7 +103,7 @@ public int hashCode() @Override public String toString() { - return id + ":" + name; + return id + ":" + name + ":" + typeCategory + ":" + children; } public enum TypeCategory diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/CommitTaskData.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/CommitTaskData.java index 37f83c855ce25..f8a625a450f3b 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/CommitTaskData.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/CommitTaskData.java @@ -29,6 +29,7 @@ public class CommitTaskData private final Optional partitionDataJson; private final FileFormat fileFormat; private final Optional referencedDataFile; + private final FileContent content; @JsonCreator public CommitTaskData( @@ -38,7 +39,8 @@ public CommitTaskData( @JsonProperty("partitionSpecJson") int partitionSpecId, @JsonProperty("partitionDataJson") Optional partitionDataJson, @JsonProperty("fileFormat") FileFormat fileFormat, - @JsonProperty("referencedDataFile") String referencedDataFile) + @JsonProperty("referencedDataFile") String referencedDataFile, + @JsonProperty("content") FileContent content) { this.path = requireNonNull(path, "path is null"); this.fileSizeInBytes = fileSizeInBytes; @@ -47,6 +49,7 @@ public CommitTaskData( this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null"); this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); this.referencedDataFile = Optional.ofNullable(referencedDataFile); + this.content = requireNonNull(content, "content is null"); } @JsonProperty @@ -90,4 +93,10 @@ public Optional getReferencedDataFile() { return referencedDataFile; } + + @JsonProperty + public FileContent getContent() + { + return content; + } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ConnectorPageSourceWithRowPositions.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ConnectorPageSourceWithRowPositions.java new file mode 100644 index 0000000000000..f8e56b952acce --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ConnectorPageSourceWithRowPositions.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.presto.spi.ConnectorPageSource; + +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public class ConnectorPageSourceWithRowPositions +{ + private final ConnectorPageSource delegate; + private final Optional startRowPosition; + private final Optional endRowPosition; + + public ConnectorPageSourceWithRowPositions( + ConnectorPageSource delegate, + Optional startRowPosition, + Optional endRowPosition) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + this.startRowPosition = requireNonNull(startRowPosition, "startRowPosition is null"); + this.endRowPosition = requireNonNull(endRowPosition, "endRowPosition is null"); + } + + public ConnectorPageSource getDelegate() + { + return delegate; + } + + public Optional getStartRowPosition() + { + return startRowPosition; + } + + public Optional getEndRowPosition() + { + return endRowPosition; + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/FileFormat.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/FileFormat.java index e06a80f5b3cc3..df4700bc8db80 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/FileFormat.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/FileFormat.java @@ -67,4 +67,23 @@ public static FileFormat fromIcebergFileFormat(org.apache.iceberg.FileFormat for return prestoFileFormat; } + + public org.apache.iceberg.FileFormat toIceberg() + { + org.apache.iceberg.FileFormat fileFormat; + switch (this) { + case ORC: + fileFormat = org.apache.iceberg.FileFormat.ORC; + break; + case PARQUET: + fileFormat = org.apache.iceberg.FileFormat.PARQUET; + break; + case AVRO: + fileFormat = org.apache.iceberg.FileFormat.AVRO; + break; + default: + throw new PrestoException(NOT_SUPPORTED, "Unsupported file format: " + this); + } + return fileFormat; + } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java index 4ae4aebb15cfd..c6855be88a022 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java @@ -26,6 +26,7 @@ import com.facebook.presto.hive.HivePartition; import com.facebook.presto.hive.HiveWrittenPartitions; import com.facebook.presto.hive.NodeVersion; +import com.facebook.presto.iceberg.changelog.ChangelogOperation; import com.facebook.presto.iceberg.changelog.ChangelogUtil; import com.facebook.presto.iceberg.statistics.StatisticsFileCache; import com.facebook.presto.spi.ColumnHandle; @@ -70,12 +71,12 @@ import com.google.common.collect.Maps; import io.airlift.slice.Slice; import org.apache.hadoop.fs.Path; -import org.apache.iceberg.AppendFiles; import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataFiles; import org.apache.iceberg.DeleteFiles; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileMetadata; +import org.apache.iceberg.IsolationLevel; import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.MetricsModes.None; import org.apache.iceberg.PartitionField; @@ -91,9 +92,11 @@ import org.apache.iceberg.UpdateProperties; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.exceptions.NoSuchViewException; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.view.View; @@ -112,6 +115,9 @@ import java.util.stream.Collectors; import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT; +import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR; +import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.SYNTHESIZED; +import static com.facebook.presto.hive.HiveUtil.PRESTO_QUERY_ID; import static com.facebook.presto.hive.MetadataUtils.getCombinedRemainingPredicate; import static com.facebook.presto.hive.MetadataUtils.getDiscretePredicates; import static com.facebook.presto.hive.MetadataUtils.getPredicate; @@ -121,9 +127,11 @@ import static com.facebook.presto.iceberg.IcebergColumnHandle.DATA_SEQUENCE_NUMBER_COLUMN_METADATA; import static com.facebook.presto.iceberg.IcebergColumnHandle.PATH_COLUMN_HANDLE; import static com.facebook.presto.iceberg.IcebergColumnHandle.PATH_COLUMN_METADATA; +import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID; import static com.facebook.presto.iceberg.IcebergMetadataColumn.DATA_SEQUENCE_NUMBER; import static com.facebook.presto.iceberg.IcebergMetadataColumn.FILE_PATH; +import static com.facebook.presto.iceberg.IcebergMetadataColumn.UPDATE_ROW_DATA; import static com.facebook.presto.iceberg.IcebergPartitionType.ALL; import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec; import static com.facebook.presto.iceberg.IcebergSessionProperties.isPushdownFilterEnabled; @@ -169,6 +177,9 @@ import static com.facebook.presto.iceberg.TableStatisticsMaker.getSupportedColumnStatistics; import static com.facebook.presto.iceberg.TypeConverter.toIcebergType; import static com.facebook.presto.iceberg.TypeConverter.toPrestoType; +import static com.facebook.presto.iceberg.changelog.ChangelogOperation.INSERT; +import static com.facebook.presto.iceberg.changelog.ChangelogOperation.UPDATE_AFTER; +import static com.facebook.presto.iceberg.changelog.ChangelogOperation.UPDATE_BEFORE; import static com.facebook.presto.iceberg.changelog.ChangelogUtil.getRowTypeFromColumnMeta; import static com.facebook.presto.iceberg.optimizer.IcebergPlanOptimizer.getEnforcedColumns; import static com.facebook.presto.iceberg.util.StatisticsUtil.calculateBaseTableStatistics; @@ -184,9 +195,13 @@ import static java.util.Collections.singletonList; import static java.util.Objects.requireNonNull; import static org.apache.iceberg.MetadataColumns.ROW_POSITION; +import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ; import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP; import static org.apache.iceberg.SnapshotSummary.REMOVED_EQ_DELETES_PROP; import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP; +import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL; +import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT; +import static org.apache.iceberg.TableProperties.UPDATE_MODE; public abstract class IcebergAbstractMetadata implements ConnectorMetadata @@ -481,7 +496,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe @Override public Optional finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection fragments, Collection computedStatistics) { - return finishWrite((IcebergOutputTableHandle) tableHandle, fragments); + return finishWrite(session, (IcebergOutputTableHandle) tableHandle, fragments, INSERT); } protected ConnectorInsertTableHandle beginIcebergTableInsert(ConnectorSession session, IcebergTableHandle table, Table icebergTable) @@ -528,10 +543,10 @@ public static List getSupportedSortFields(Schema schema, SortOrder so @Override public Optional finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection fragments, Collection computedStatistics) { - return finishWrite((IcebergInsertTableHandle) insertHandle, fragments); + return finishWrite(session, (IcebergInsertTableHandle) insertHandle, fragments, INSERT); } - private Optional finishWrite(IcebergWritableTableHandle writableTableHandle, Collection fragments) + private Optional finishWrite(ConnectorSession session, IcebergWritableTableHandle writableTableHandle, Collection fragments, ChangelogOperation operationType) { if (fragments.isEmpty()) { transaction.commitTransaction(); @@ -544,40 +559,99 @@ private Optional finishWrite(IcebergWritableTableHandle .map(slice -> commitTaskCodec.fromJson(slice.getBytes())) .collect(toImmutableList()); - Type[] partitionColumnTypes = icebergTable.spec().fields().stream() - .map(field -> field.transform().getResultType( - icebergTable.schema().findType(field.sourceId()))) - .toArray(Type[]::new); + RowDelta rowDelta = transaction.newRowDelta(); + writableTableHandle.getTableName().getSnapshotId().map(icebergTable::snapshot).ifPresent(s -> rowDelta.validateFromSnapshot(s.snapshotId())); + IsolationLevel isolationLevel = IsolationLevel.fromName(icebergTable.properties().getOrDefault(DELETE_ISOLATION_LEVEL, DELETE_ISOLATION_LEVEL_DEFAULT)); - AppendFiles appendFiles = transaction.newFastAppend(); - for (CommitTaskData task : commitTasks) { - DataFiles.Builder builder = DataFiles.builder(icebergTable.spec()) - .withPath(task.getPath()) - .withFileSizeInBytes(task.getFileSizeInBytes()) - .withFormat(FileFormat.fromString(writableTableHandle.getFileFormat().name())) - .withMetrics(task.getMetrics().metrics()); + ImmutableSet.Builder writtenFiles = ImmutableSet.builder(); + ImmutableSet.Builder referencedDataFiles = ImmutableSet.builder(); + commitTasks.forEach(task -> handleTask(task, icebergTable, rowDelta, writtenFiles, referencedDataFiles)); - if (!icebergTable.spec().fields().isEmpty()) { - String partitionDataJson = task.getPartitionDataJson() - .orElseThrow(() -> new VerifyException("No partition data for partitioned table")); - builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes)); - } + rowDelta.validateDataFilesExist(referencedDataFiles.build()); + if (isolationLevel == IsolationLevel.SERIALIZABLE) { + rowDelta.validateNoConflictingDataFiles(); + } - appendFiles.appendFile(builder.build()); + // Ensure a row that is updated by this commit was not deleted by a separate commit + if (operationType == UPDATE_BEFORE || operationType == UPDATE_AFTER) { + rowDelta.validateDeletedFiles(); + rowDelta.validateNoConflictingDeleteFiles(); } - appendFiles.commit(); - transaction.commitTransaction(); + try { + rowDelta.set(PRESTO_QUERY_ID, session.getQueryId()); + rowDelta.commit(); + transaction.commitTransaction(); + } + catch (ValidationException e) { + log.error(e, "ValidationException in finishWrite"); + throw new PrestoException(ICEBERG_COMMIT_ERROR, "Failed to commit Iceberg update to table: " + writableTableHandle.getTableName(), e); + } return Optional.of(new HiveWrittenPartitions(commitTasks.stream() .map(CommitTaskData::getPath) .collect(toImmutableList()))); } + private void handleTask(CommitTaskData task, Table icebergTable, RowDelta rowDelta, ImmutableSet.Builder writtenFiles, ImmutableSet.Builder referencedDataFiles) + { + PartitionSpec partitionSpec = icebergTable.specs().get(task.getPartitionSpecId()); + Type[] partitionColumnTypes = partitionSpec.fields().stream() + .map(field -> field.transform().getResultType(icebergTable.schema().findType(field.sourceId()))) + .toArray(Type[]::new); + switch (task.getContent()) { + case POSITION_DELETES: + handleFinishPositionDeletes(task, partitionSpec, partitionColumnTypes, rowDelta, writtenFiles, referencedDataFiles); + break; + case DATA: + handleFinishData(task, icebergTable, partitionSpec, partitionColumnTypes, rowDelta, writtenFiles, referencedDataFiles); + break; + default: + throw new UnsupportedOperationException("Unsupported task content: " + task.getContent()); + } + } + + private void handleFinishPositionDeletes(CommitTaskData task, PartitionSpec partitionSpec, Type[] partitionColumnTypes, RowDelta rowDelta, ImmutableSet.Builder writtenFiles, ImmutableSet.Builder referencedDataFiles) + { + FileMetadata.Builder deleteBuilder = FileMetadata.deleteFileBuilder(partitionSpec) + .withPath(task.getPath()) + .withFormat(task.getFileFormat().toIceberg()) + .ofPositionDeletes() + .withFileSizeInBytes(task.getFileSizeInBytes()) + .withMetrics(task.getMetrics().metrics()); + + if (!partitionSpec.fields().isEmpty()) { + String partitionDataJson = task.getPartitionDataJson() + .orElseThrow(() -> new VerifyException("No partition data for partitioned table")); + deleteBuilder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes)); + } + + rowDelta.addDeletes(deleteBuilder.build()); + writtenFiles.add(task.getPath()); + task.getReferencedDataFile().ifPresent(referencedDataFiles::add); + } + + private void handleFinishData(CommitTaskData task, Table icebergTable, PartitionSpec partitionSpec, Type[] partitionColumnTypes, RowDelta rowDelta, ImmutableSet.Builder writtenFiles, ImmutableSet.Builder referencedDataFiles) + { + DataFiles.Builder builder = DataFiles.builder(partitionSpec) + .withPath(task.getPath()) + .withFormat(task.getFileFormat().toIceberg()) + .withFileSizeInBytes(task.getFileSizeInBytes()) + .withMetrics(task.getMetrics().metrics()); + + if (!icebergTable.spec().fields().isEmpty()) { + String partitionDataJson = task.getPartitionDataJson() + .orElseThrow(() -> new VerifyException("No partition data for partitioned table")); + builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes)); + } + rowDelta.addRows(builder.build()); + writtenFiles.add(task.getPath()); + } + @Override public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle) { - return IcebergColumnHandle.create(ROW_POSITION, typeManager, IcebergColumnHandle.ColumnType.REGULAR); + return IcebergColumnHandle.create(ROW_POSITION, typeManager, REGULAR); } @Override @@ -641,6 +715,7 @@ protected ImmutableMap createMetadataProperties(Table icebergTab } properties.put(DELETE_MODE, IcebergUtil.getDeleteMode(icebergTable)); + properties.put(UPDATE_MODE, IcebergUtil.getUpdateMode(icebergTable)); properties.put(METADATA_PREVIOUS_VERSIONS_MAX, IcebergUtil.getMetadataPreviousVersionsMax(icebergTable)); properties.put(METADATA_DELETE_AFTER_COMMIT, IcebergUtil.isMetadataDeleteAfterCommit(icebergTable)); properties.put(METRICS_MAX_INFERRED_COLUMN, IcebergUtil.getMetricsMaxInferredColumn(icebergTable)); @@ -670,14 +745,14 @@ protected ImmutableMap createViewMetadataProperties(View view) public static Schema toIcebergSchema(List columns) { - List icebergColumns = new ArrayList<>(); + List icebergColumns = new ArrayList<>(); for (ColumnMetadata column : columns) { if (!column.isHidden()) { int index = icebergColumns.size(); Type type = toIcebergType(column.getType()); - Types.NestedField field = column.isNullable() - ? Types.NestedField.optional(index, column.getName(), type, column.getComment()) - : Types.NestedField.required(index, column.getName(), type, column.getComment()); + NestedField field = column.isNullable() + ? NestedField.optional(index, column.getName(), type, column.getComment()) + : NestedField.required(index, column.getName(), type, column.getComment()); icebergColumns.add(field); } } @@ -867,7 +942,8 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa tableSchemaJson, Optional.empty(), Optional.empty(), - getSortFields(table)); + getSortFields(table), + ImmutableList.of()); } @Override @@ -1120,4 +1196,69 @@ else if (tableVersion.getVersionExpressionType() instanceof VarcharType) { } throw new PrestoException(NOT_SUPPORTED, "Unsupported table version type: " + tableVersion.getVersionType()); } + + /** + * The row ID update column handle is a struct type which represents the unmodified columns of + * the query. + * + * @return A column handle for the Row ID update column. + */ + @Override + public ColumnHandle getUpdateRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle, List updatedColumns) + { + List unmodifiedColumns = new ArrayList<>(); + unmodifiedColumns.add(ROW_POSITION); + // Include all the non-updated columns. These are needed when writing the new data file with updated column values. + IcebergTableHandle table = (IcebergTableHandle) tableHandle; + Set updatedFields = updatedColumns.stream() + .map(IcebergColumnHandle.class::cast) + .map(IcebergColumnHandle::getId) + .collect(toImmutableSet()); + for (NestedField column : SchemaParser.fromJson(table.getTableSchemaJson().get()).columns()) { + if (!updatedFields.contains(column.fieldId())) { + unmodifiedColumns.add(column); + } + } + NestedField field = NestedField.required(UPDATE_ROW_DATA.getId(), UPDATE_ROW_DATA.getColumnName(), Types.StructType.of(unmodifiedColumns)); + return IcebergColumnHandle.create(field, typeManager, SYNTHESIZED); + } + + @Override + public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTableHandle tableHandle, List updatedColumns) + { + IcebergTableHandle handle = (IcebergTableHandle) tableHandle; + Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); + int formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion(); + if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE || + !Optional.ofNullable(icebergTable.properties().get(TableProperties.UPDATE_MODE)) + .map(mode -> mode.equals(MERGE_ON_READ.modeName())) + .orElse(false)) { + throw new RuntimeException("Iceberg table updates require at least format version 2 and update mode must be merge-on-read"); + } + validateTableMode(session, icebergTable); + transaction = icebergTable.newTransaction(); + return handle + .withUpdatedColumns(updatedColumns.stream() + .map(IcebergColumnHandle.class::cast) + .collect(toImmutableList())); + } + + @Override + public void finishUpdate(ConnectorSession session, ConnectorTableHandle tableHandle, Collection fragments) + { + IcebergTableHandle handle = (IcebergTableHandle) tableHandle; + Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); + IcebergOutputTableHandle outputTableHandle = new IcebergOutputTableHandle( + handle.getSchemaName(), + handle.getIcebergTableName(), + toPrestoSchema(icebergTable.schema(), typeManager), + toPrestoPartitionSpec(icebergTable.spec(), typeManager), + handle.getUpdatedColumns(), + icebergTable.location(), + getFileFormat(icebergTable), + getCompressionCodec(session), + icebergTable.properties(), + handle.getSortOrder()); + finishWrite(session, outputTableHandle, fragments, UPDATE_AFTER); + } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergColumnHandle.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergColumnHandle.java index 37f4f4894026c..fa912f60a3f8c 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergColumnHandle.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergColumnHandle.java @@ -35,6 +35,7 @@ import static com.facebook.presto.iceberg.ColumnIdentity.primitiveColumnIdentity; import static com.facebook.presto.iceberg.IcebergMetadataColumn.DATA_SEQUENCE_NUMBER; import static com.facebook.presto.iceberg.IcebergMetadataColumn.FILE_PATH; +import static com.facebook.presto.iceberg.IcebergMetadataColumn.UPDATE_ROW_DATA; import static com.facebook.presto.iceberg.TypeConverter.toPrestoType; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.Iterables.getOnlyElement; @@ -96,6 +97,12 @@ public boolean isRowPositionColumn() return columnIdentity.getId() == ROW_POSITION.fieldId(); } + @JsonIgnore + public boolean isUpdateRowIdColumn() + { + return columnIdentity.getId() == UPDATE_ROW_DATA.getId(); + } + @Override public ColumnHandle withRequiredSubfields(List subfields) { diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergErrorCode.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergErrorCode.java index ac4c88e0c1f30..c833a65eaf62f 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergErrorCode.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergErrorCode.java @@ -39,7 +39,8 @@ public enum IcebergErrorCode ICEBERG_ROLLBACK_ERROR(13, EXTERNAL), ICEBERG_INVALID_FORMAT_VERSION(14, USER_ERROR), ICEBERG_UNKNOWN_MANIFEST_TYPE(15, EXTERNAL), - ICEBERG_COMMIT_ERROR(16, EXTERNAL); + ICEBERG_COMMIT_ERROR(16, EXTERNAL), + ICEBERG_MISSING_COLUMN(17, EXTERNAL); private final ErrorCode errorCode; diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergMetadataColumn.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergMetadataColumn.java index a3e3d3e258cb1..9758f325338c7 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergMetadataColumn.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergMetadataColumn.java @@ -13,21 +13,30 @@ */ package com.facebook.presto.iceberg; +import com.facebook.presto.common.type.RowType; import com.facebook.presto.common.type.Type; +import com.google.common.collect.ImmutableList; import org.apache.iceberg.MetadataColumns; import java.util.Set; import java.util.stream.Stream; import static com.facebook.presto.common.type.BigintType.BIGINT; +import static com.facebook.presto.common.type.UnknownType.UNKNOWN; import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.facebook.presto.iceberg.ColumnIdentity.TypeCategory.PRIMITIVE; +import static com.facebook.presto.iceberg.ColumnIdentity.TypeCategory.STRUCT; import static com.google.common.collect.ImmutableSet.toImmutableSet; public enum IcebergMetadataColumn { FILE_PATH(MetadataColumns.FILE_PATH.fieldId(), "$path", VARCHAR, PRIMITIVE), DATA_SEQUENCE_NUMBER(Integer.MAX_VALUE - 1001, "$data_sequence_number", BIGINT, PRIMITIVE), + /** + * Iceberg reserved row ids begin at INTEGER.MAX_VALUE and count down. Starting with MIN_VALUE here to avoid conflicts. + * Inner type for row is not known until runtime. + */ + UPDATE_ROW_DATA(Integer.MIN_VALUE, "$row_id", RowType.anonymous(ImmutableList.of(UNKNOWN)), STRUCT) /**/; private static final Set COLUMN_IDS = Stream.of(values()) diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java index 43101529e64fc..e526ff13332a8 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java @@ -70,6 +70,7 @@ import static com.facebook.presto.common.type.Decimals.readBigDecimal; import static com.facebook.presto.hive.util.ConfigurationUtils.toJobConf; +import static com.facebook.presto.iceberg.FileContent.DATA; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_TOO_MANY_OPEN_PARTITIONS; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_WRITER_OPEN_ERROR; @@ -107,13 +108,13 @@ public class IcebergPageSink private final ConnectorSession session; private final FileFormat fileFormat; private final PagePartitioner pagePartitioner; + private final Table table; private final List writers = new ArrayList<>(); private long writtenBytes; private long systemMemoryUsage; private long validationCpuNanos; - private Table table; private final List sortOrder; private final Path tempDirectory; @@ -221,7 +222,8 @@ public CompletableFuture> finish() partitionSpec.specId(), context.getPartitionData().map(PartitionData::toJson), fileFormat, - null); + null, + DATA); commitTasks.add(wrappedBuffer(jsonCodec.toJsonBytes(task))); } @@ -521,8 +523,8 @@ private static class PagePartitioner private Page transformedPage; public PagePartitioner(PageIndexerFactory pageIndexerFactory, - List columns, - ConnectorSession session) + List columns, + ConnectorSession session) { this.pageIndexer = pageIndexerFactory.createPageIndexer(columns.stream() .map(PartitionColumn::getResultType) diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java index 0dd5b46f15eb6..ca982e29cfc54 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java @@ -73,6 +73,7 @@ import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorSplit; import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.PageIndexerFactory; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.SplitContext; import com.facebook.presto.spi.connector.ConnectorPageSourceProvider; @@ -89,10 +90,14 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.BlockMissingException; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.Table; import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Types.NestedField; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.crypto.InternalFileDecryptor; import org.apache.parquet.hadoop.metadata.BlockMetaData; @@ -110,7 +115,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -122,7 +126,6 @@ import java.util.stream.IntStream; import static com.facebook.presto.common.type.VarcharType.VARCHAR; -import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY; import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR; import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.SYNTHESIZED; import static com.facebook.presto.hive.CacheQuota.NO_CACHE_CONSTRAINTS; @@ -146,9 +149,12 @@ import static com.facebook.presto.iceberg.IcebergColumnHandle.isPushedDownSubfield; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT; +import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_MISSING_COLUMN; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_MISSING_DATA; import static com.facebook.presto.iceberg.IcebergOrcColumn.ROOT_COLUMN_ID; +import static com.facebook.presto.iceberg.IcebergUtil.getColumns; import static com.facebook.presto.iceberg.IcebergUtil.getLocationProvider; +import static com.facebook.presto.iceberg.IcebergUtil.getShallowWrappedIcebergTable; import static com.facebook.presto.iceberg.TypeConverter.ORC_ICEBERG_ID_KEY; import static com.facebook.presto.iceberg.TypeConverter.toHiveType; import static com.facebook.presto.iceberg.delete.EqualityDeleteFilter.readEqualityDeletes; @@ -173,6 +179,7 @@ import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Maps.uniqueIndex; import static io.airlift.slice.Slices.utf8Slice; import static java.lang.String.format; @@ -197,8 +204,10 @@ public class IcebergPageSourceProvider private final HiveClientConfig hiveClientConfig; private final IcebergFileWriterFactory fileWriterFactory; private final JsonCodec jsonCodec; - private final ParquetMetadataSource parquetMetadataSource; + private final PageIndexerFactory pageIndexerFactory; + private final int maxOpenPartitions; + private final SortParameters sortParameters; @Inject public IcebergPageSourceProvider( @@ -211,7 +220,10 @@ public IcebergPageSourceProvider( HiveClientConfig hiveClientConfig, ParquetMetadataSource parquetMetadataSource, IcebergFileWriterFactory fileWriterFactory, - JsonCodec jsonCodec) + JsonCodec jsonCodec, + PageIndexerFactory pageIndexerFactory, + IcebergConfig icebergConfig, + SortParameters sortParameters) { this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.fileFormatDataSourceStats = requireNonNull(fileFormatDataSourceStats, "fileFormatDataSourceStats is null"); @@ -223,6 +235,10 @@ public IcebergPageSourceProvider( this.parquetMetadataSource = requireNonNull(parquetMetadataSource, "parquetMetadataSource is null"); this.fileWriterFactory = requireNonNull(fileWriterFactory, "fileWriterFactory is null"); this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null"); + this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null"); + requireNonNull(icebergConfig, "icebergConfig is null"); + this.maxOpenPartitions = icebergConfig.getMaxPartitionsPerWriter(); + this.sortParameters = requireNonNull(sortParameters, "sortParameters is null"); } private static ConnectorPageSourceWithRowPositions createParquetPageSource( @@ -337,12 +353,10 @@ private static ConnectorPageSourceWithRowPositions createParquetPageSource( for (int columnIndex = 0; columnIndex < regularColumns.size(); columnIndex++) { IcebergColumnHandle column = regularColumns.get(columnIndex); namesBuilder.add(column.getName()); - Type prestoType = column.getType(); - prestoTypes.add(prestoType); - if (column.getColumnType() == IcebergColumnHandle.ColumnType.SYNTHESIZED) { + if (column.getColumnType() == IcebergColumnHandle.ColumnType.SYNTHESIZED && !column.isUpdateRowIdColumn()) { Subfield pushedDownSubfield = getPushedDownSubfield(column); List nestedColumnPath = nestedColumnPath(pushedDownSubfield); Optional columnIO = findNestedColumnIO(lookupColumnByName(messageColumnIO, pushedDownSubfield.getRootName()), nestedColumnPath); @@ -747,39 +761,84 @@ public ConnectorPageSource createPageSource( .map(IcebergColumnHandle.class::cast) .collect(toImmutableList()); - Map partitionKeys = split.getPartitionKeys(); - - List regularColumns = columns.stream() - .map(IcebergColumnHandle.class::cast) - .filter(column -> column.getColumnType() != PARTITION_KEY && - !partitionKeys.containsKey(column.getId()) && - !IcebergMetadataColumn.isMetadataColumnId(column.getId())) - .collect(Collectors.toList()); - Optional tableSchemaJson = table.getTableSchemaJson(); verify(tableSchemaJson.isPresent(), "tableSchemaJson is null"); Schema tableSchema = SchemaParser.fromJson(tableSchemaJson.get()); + PartitionSpec partitionSpec = PartitionSpecParser.fromJson(tableSchema, split.getPartitionSpecAsJson()); - boolean equalityDeletesRequired = table.getIcebergTableName().getTableType() == IcebergTableType.DATA; - Set deleteFilterRequiredColumns = requiredColumnsForDeletes(tableSchema, split.getDeletes(), equalityDeletesRequired); + Map partitionKeys = split.getPartitionKeys(); - deleteFilterRequiredColumns.stream() + // the update row isn't a valid column that can be read from storage. + // Filter it out from columns passed to the storage page source. + Set columnsToReadFromStorage = icebergColumns.stream() + .filter(not(IcebergColumnHandle::isUpdateRowIdColumn)) + .collect(Collectors.toSet()); + + // add any additional columns which may need to be read from storage + // by delete filters + boolean equalityDeletesRequired = table.getIcebergTableName().getTableType() == IcebergTableType.DATA; + requiredColumnsForDeletes(tableSchema, partitionSpec, split.getDeletes(), equalityDeletesRequired) + .stream() .filter(not(icebergColumns::contains)) - .forEach(regularColumns::add); + .forEach(columnsToReadFromStorage::add); + + // finally, add the fields that the update column requires. + Optional updateRow = icebergColumns.stream() + .filter(IcebergColumnHandle::isUpdateRowIdColumn) + .findFirst(); + updateRow.ifPresent(updateRowIdColumn -> { + Set alreadyRequiredColumnIds = columnsToReadFromStorage.stream() + .map(IcebergColumnHandle::getId) + .collect(toImmutableSet()); + updateRowIdColumn.getColumnIdentity().getChildren() + .stream() + .filter(colId -> !alreadyRequiredColumnIds.contains(colId.getId())) + .forEach(colId -> { + if (colId.getId() == ROW_POSITION.fieldId()) { + IcebergColumnHandle handle = IcebergColumnHandle.create(ROW_POSITION, typeManager, REGULAR); + columnsToReadFromStorage.add(handle); + } + else { + NestedField column = tableSchema.findField(colId.getId()); + if (column == null) { + throw new PrestoException(ICEBERG_MISSING_COLUMN, "Could not find field " + colId + " in table schema: " + tableSchema); + } + IcebergColumnHandle handle = IcebergColumnHandle.create(column, typeManager, REGULAR); + columnsToReadFromStorage.add(handle); + } + }); + }); // TODO: pushdownFilter for icebergLayout HdfsContext hdfsContext = new HdfsContext(session, table.getSchemaName(), table.getIcebergTableName().getTableName()); - ConnectorPageSourceWithRowPositions connectorPageSourceWithRowPositions = createDataPageSource( - session, - hdfsContext, - new Path(split.getPath()), - split.getStart(), - split.getLength(), - split.getFileFormat(), - regularColumns, - icebergLayout.getValidPredicate(), - splitContext.isCacheable()); - ConnectorPageSource dataPageSource = connectorPageSourceWithRowPositions.getConnectorPageSource(); + Function, ConnectorPageSourceWithRowPositions> partitionPageSourceDelegate = + (columnList) -> createDataPageSource( + session, + hdfsContext, + new Path(split.getPath()), + split.getStart(), + split.getLength(), + split.getFileFormat(), + columnList, + icebergLayout.getValidPredicate(), + splitContext.isCacheable()); + + ImmutableMap.Builder metadataValues = ImmutableMap.builder(); + for (IcebergColumnHandle icebergColumn : icebergColumns) { + if (icebergColumn.isPathColumn()) { + metadataValues.put(icebergColumn.getColumnIdentity().getId(), utf8Slice(split.getPath())); + } + else if (icebergColumn.isDataSequenceNumberColumn()) { + metadataValues.put(icebergColumn.getColumnIdentity().getId(), split.getDataSequenceNumber()); + } + } + + List delegateColumns = columnsToReadFromStorage.stream().collect(toImmutableList()); + IcebergPartitionInsertingPageSource partitionInsertingPageSource = new IcebergPartitionInsertingPageSource( + delegateColumns, + metadataValues.build(), + partitionKeys, + partitionPageSourceDelegate); Optional outputPath = table.getOutputPath(); Optional> storageProperties = table.getStorageProperties(); @@ -811,31 +870,55 @@ public ConnectorPageSource createPageSource( tableSchema, split.getPath(), deletesToApply, - connectorPageSourceWithRowPositions.getStartRowPosition(), - connectorPageSourceWithRowPositions.getEndRowPosition()); + partitionInsertingPageSource.getRowPositionDelegate().getStartRowPosition(), + partitionInsertingPageSource.getRowPositionDelegate().getEndRowPosition()); return deleteFilters.stream() - .map(filter -> filter.createPredicate(regularColumns)) + .map(filter -> filter.createPredicate(delegateColumns)) .reduce(RowPredicate::and); }); + Table icebergTable = getShallowWrappedIcebergTable( + tableSchema, + partitionSpec, + table.getStorageProperties().orElseThrow(() -> new IllegalArgumentException("storage properties must not be null")), + Optional.empty()); + Supplier updatedRowPageSinkSupplier = () -> new IcebergPageSink( + icebergTable, + locationProvider, + fileWriterFactory, + pageIndexerFactory, + hdfsEnvironment, + hdfsContext, + getColumns(tableSchema, partitionSpec, typeManager), + jsonCodec, + session, + split.getFileFormat(), + maxOpenPartitions, + table.getSortOrder(), + sortParameters); - HashMap metadataValues = new HashMap<>(); - for (IcebergColumnHandle icebergColumn : icebergColumns) { - if (icebergColumn.isPathColumn()) { - metadataValues.put(icebergColumn.getColumnIdentity().getId(), utf8Slice(split.getPath())); - } - else if (icebergColumn.isDataSequenceNumberColumn()) { - metadataValues.put(icebergColumn.getColumnIdentity().getId(), split.getDataSequenceNumber()); - } - } + ConnectorPageSource dataSource = new IcebergUpdateablePageSource( + tableSchema, + icebergColumns, + metadataValues.build(), + partitionKeys, + partitionInsertingPageSource, + delegateColumns, + deleteSinkSupplier, + deletePredicate, + updatedRowPageSinkSupplier, + table.getUpdatedColumns(), + updateRow); - ConnectorPageSource dataSource = new IcebergUpdateablePageSource(icebergColumns, metadataValues, partitionKeys, dataPageSource, deleteSinkSupplier, deletePredicate); if (split.getChangelogSplitInfo().isPresent()) { dataSource = new ChangelogPageSource(dataSource, split.getChangelogSplitInfo().get(), (List) (List) desiredColumns, icebergColumns); } return dataSource; } - private Set requiredColumnsForDeletes(Schema schema, List deletes, boolean equalityDeletesRequired) + private Set requiredColumnsForDeletes(Schema schema, + PartitionSpec partitionSpec, + List deletes, + boolean equalityDeletesRequired) { ImmutableSet.Builder requiredColumns = ImmutableSet.builder(); for (DeleteFile deleteFile : deletes) { @@ -843,8 +926,7 @@ private Set requiredColumnsForDeletes(Schema schema, List IcebergColumnHandle.create(schema.findField(id), typeManager, IcebergColumnHandle.ColumnType.REGULAR)) + getColumns(deleteFile.equalityFieldIds().stream(), schema, partitionSpec, typeManager) .forEach(requiredColumns::add); } } @@ -942,7 +1024,7 @@ private ConnectorPageSource openDeletes( columns, tupleDomain, false) - .getConnectorPageSource(); + .getDelegate(); } private ConnectorPageSourceWithRowPositions createDataPageSource( @@ -1004,36 +1086,4 @@ private ConnectorPageSourceWithRowPositions createDataPageSource( } throw new PrestoException(NOT_SUPPORTED, "File format not supported for Iceberg: " + fileFormat); } - - private static final class ConnectorPageSourceWithRowPositions - { - private final ConnectorPageSource connectorPageSource; - private final Optional startRowPosition; - private final Optional endRowPosition; - - public ConnectorPageSourceWithRowPositions( - ConnectorPageSource connectorPageSource, - Optional startRowPosition, - Optional endRowPosition) - { - this.connectorPageSource = requireNonNull(connectorPageSource, "connectorPageSource is null"); - this.startRowPosition = requireNonNull(startRowPosition, "startRowPosition is null"); - this.endRowPosition = requireNonNull(endRowPosition, "endRowPosition is null"); - } - - public ConnectorPageSource getConnectorPageSource() - { - return connectorPageSource; - } - - public Optional getStartRowPosition() - { - return startRowPosition; - } - - public Optional getEndRowPosition() - { - return endRowPosition; - } - } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPartitionInsertingPageSource.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPartitionInsertingPageSource.java new file mode 100644 index 0000000000000..b0a8eec44b726 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPartitionInsertingPageSource.java @@ -0,0 +1,214 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.presto.common.Page; +import com.facebook.presto.common.block.Block; +import com.facebook.presto.common.block.RunLengthEncodedBlock; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.hive.HivePartitionKey; +import com.facebook.presto.spi.ConnectorPageSource; +import com.facebook.presto.spi.PrestoException; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.IntStream; + +import static com.facebook.presto.common.Utils.nativeValueToBlock; +import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY; +import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; +import static com.facebook.presto.iceberg.IcebergMetadataColumn.isMetadataColumnId; +import static com.facebook.presto.iceberg.IcebergUtil.deserializePartitionValue; +import static com.google.common.base.Throwables.throwIfInstanceOf; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.util.Objects.requireNonNull; + +/** + * This class is responsible for filtering the set of columns that should be + * read by a page source which performs IO to on-disk files to only those which + * are not partition or metadata fields. + *

+ * When a new page is requested, this class inserts the partition fields as + * RLE-encoded blocks to save memory and IO cost at runtime. + */ +public class IcebergPartitionInsertingPageSource + implements ConnectorPageSource +{ + private final IcebergColumnHandle[] nonPartitionColumnIndexes; + private final ConnectorPageSourceWithRowPositions delegateWithPositions; + private final ConnectorPageSource delegate; + private final Block[] partitionValueBlocks; + private final long partitionValuesMemoryUsage; + // maps output array index to index of input page from delegate provider. + private final int[] outputIndexes; + + public IcebergPartitionInsertingPageSource( + List fullColumnList, + Map metadataValues, + Map partitionKeys, + Function, ConnectorPageSourceWithRowPositions> delegateSupplier) + { + this.nonPartitionColumnIndexes = new IcebergColumnHandle[fullColumnList.size()]; + this.outputIndexes = new int[fullColumnList.size()]; + populateIndexes(fullColumnList, partitionKeys); + this.partitionValueBlocks = generatePartitionValueBlocks(fullColumnList, metadataValues, partitionKeys); + this.partitionValuesMemoryUsage = Arrays.stream(partitionValueBlocks).filter(Objects::nonNull).mapToLong(Block::getRetainedSizeInBytes).sum(); + + List delegateColumns = Arrays.stream(nonPartitionColumnIndexes) + .filter(Objects::nonNull) + .collect(toImmutableList()); + + this.delegateWithPositions = delegateSupplier.apply(delegateColumns); + this.delegate = delegateWithPositions.getDelegate(); + } + + private void populateIndexes(List fullColumnList, Map partitionKeys) + { + int delegateIndex = 0; + // generate array of non-partition column indexes + for (int i = 0; i < fullColumnList.size(); i++) { + IcebergColumnHandle handle = fullColumnList.get(i); + if (handle.getColumnType() == PARTITION_KEY || + partitionKeys.containsKey(handle.getId()) || + isMetadataColumnId(handle.getId())) { + // is partition key, don't include for delegate supplier + continue; + } + nonPartitionColumnIndexes[i] = handle; + outputIndexes[i] = delegateIndex; + delegateIndex++; + } + } + + private Block[] generatePartitionValueBlocks( + List fullColumnList, + Map metadataValues, + Map partitionKeys) + { + return IntStream.range(0, fullColumnList.size()) + .mapToObj(index -> { + IcebergColumnHandle column = fullColumnList.get(index); + if (nonPartitionColumnIndexes[index] != null) { + return null; + } + + Type type = column.getType(); + if (partitionKeys.containsKey(column.getId())) { + HivePartitionKey icebergPartition = partitionKeys.get(column.getId()); + Object prefilledValue = deserializePartitionValue(type, icebergPartition.getValue().orElse(null), column.getName()); + return nativeValueToBlock(type, prefilledValue); + } + else if (column.getColumnType() == PARTITION_KEY) { + // Partition key with no value. This can happen after partition evolution + return nativeValueToBlock(type, null); + } + else if (isMetadataColumnId(column.getId())) { + return nativeValueToBlock(type, metadataValues.get(column.getColumnIdentity().getId())); + } + + return null; + }) + .toArray(Block[]::new); + } + + public ConnectorPageSourceWithRowPositions getRowPositionDelegate() + { + return delegateWithPositions; + } + + @Override + public long getCompletedBytes() + { + return delegate.getCompletedBytes(); + } + + @Override + public long getCompletedPositions() + { + return delegate.getCompletedPositions(); + } + + @Override + public long getReadTimeNanos() + { + return delegate.getReadTimeNanos(); + } + + @Override + public boolean isFinished() + { + return delegate.isFinished(); + } + + @Override + public Page getNextPage() + { + try { + Page dataPage = delegate.getNextPage(); + if (dataPage == null) { + return null; + } + int batchSize = dataPage.getPositionCount(); + Block[] blocks = new Block[nonPartitionColumnIndexes.length]; + for (int i = 0; i < nonPartitionColumnIndexes.length; i++) { + blocks[i] = partitionValueBlocks[i] == null ? + dataPage.getBlock(outputIndexes[i]) : + new RunLengthEncodedBlock(partitionValueBlocks[i], batchSize); + } + + return new Page(batchSize, blocks); + } + catch (RuntimeException e) { + closeWithSuppression(e); + throwIfInstanceOf(e, PrestoException.class); + throw new PrestoException(ICEBERG_BAD_DATA, e); + } + } + + protected void closeWithSuppression(Throwable throwable) + { + requireNonNull(throwable, "throwable is null"); + try { + close(); + } + catch (RuntimeException e) { + // Self-suppression not permitted + if (throwable != e) { + throwable.addSuppressed(e); + } + } + } + + @Override + public long getSystemMemoryUsage() + { + return delegate.getSystemMemoryUsage() + partitionValuesMemoryUsage; + } + + @Override + public void close() + { + try { + delegate.close(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableHandle.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableHandle.java index 9d6b2f9f4a41f..78f77e78ed599 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableHandle.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableHandle.java @@ -37,6 +37,7 @@ public class IcebergTableHandle private final Optional> partitionFieldIds; private final Optional> equalityFieldIds; private final List sortOrder; + private final List updatedColumns; @JsonCreator public IcebergTableHandle( @@ -48,7 +49,8 @@ public IcebergTableHandle( @JsonProperty("tableSchemaJson") Optional tableSchemaJson, @JsonProperty("partitionFieldIds") Optional> partitionFieldIds, @JsonProperty("equalityFieldIds") Optional> equalityFieldIds, - @JsonProperty("sortOrder") List sortOrder) + @JsonProperty("sortOrder") List sortOrder, + @JsonProperty("updatedColumns") List updatedColumns) { super(schemaName, icebergTableName.getTableName()); @@ -60,6 +62,7 @@ public IcebergTableHandle( this.partitionFieldIds = requireNonNull(partitionFieldIds, "partitionFieldIds is null"); this.equalityFieldIds = requireNonNull(equalityFieldIds, "equalityFieldIds is null"); this.sortOrder = ImmutableList.copyOf(requireNonNull(sortOrder, "sortOrder is null")); + this.updatedColumns = requireNonNull(updatedColumns, "updatedColumns is null"); } @JsonProperty @@ -110,6 +113,27 @@ public Optional> getEqualityFieldIds() return equalityFieldIds; } + @JsonProperty + public List getUpdatedColumns() + { + return updatedColumns; + } + + public IcebergTableHandle withUpdatedColumns(List updatedColumns) + { + return new IcebergTableHandle( + getSchemaName(), + icebergTableName, + snapshotSpecified, + outputPath, + storageProperties, + tableSchemaJson, + partitionFieldIds, + equalityFieldIds, + sortOrder, + updatedColumns); + } + @Override public boolean equals(Object o) { diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableName.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableName.java index c9f8349e4b24e..b199a9569ea25 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableName.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableName.java @@ -103,7 +103,7 @@ public String getTableNameWithType() @Override public String toString() { - return getTableNameWithType() + "@" + snapshotId; + return getTableNameWithType() + snapshotId.map(snap -> "@" + snap).orElse(""); } public static IcebergTableName from(String name) diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableProperties.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableProperties.java index fe289a75ca4d9..d7e763fe462f3 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableProperties.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableProperties.java @@ -32,6 +32,7 @@ import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty; import static com.google.common.collect.ImmutableList.toImmutableList; import static java.util.Locale.ENGLISH; +import static org.apache.iceberg.TableProperties.UPDATE_MODE; public class IcebergTableProperties { @@ -123,6 +124,15 @@ public IcebergTableProperties(IcebergConfig icebergConfig) "The maximum number of columns for which metrics are collected", icebergConfig.getMetricsMaxInferredColumn(), false)) + .add(new PropertyMetadata<>( + UPDATE_MODE, + "Update mode for the table", + createUnboundedVarcharType(), + RowLevelOperationMode.class, + RowLevelOperationMode.MERGE_ON_READ, + false, + value -> RowLevelOperationMode.fromName((String) value), + RowLevelOperationMode::modeName)) .build(); columnProperties = ImmutableList.of(stringProperty( @@ -195,4 +205,9 @@ public static Integer getMetricsMaxInferredColumn(Map tablePrope { return (Integer) tableProperties.get(METRICS_MAX_INFERRED_COLUMN); } + + public static RowLevelOperationMode getUpdateMode(Map tableProperties) + { + return (RowLevelOperationMode) tableProperties.get(UPDATE_MODE); + } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUpdateablePageSource.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUpdateablePageSource.java index 98b7a4690145d..16cc207683a9b 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUpdateablePageSource.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUpdateablePageSource.java @@ -15,8 +15,8 @@ import com.facebook.presto.common.Page; import com.facebook.presto.common.block.Block; -import com.facebook.presto.common.block.RunLengthEncodedBlock; -import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.block.ColumnarRow; +import com.facebook.presto.common.block.RowBlock; import com.facebook.presto.hive.HivePartitionKey; import com.facebook.presto.iceberg.delete.IcebergDeletePageSink; import com.facebook.presto.iceberg.delete.RowPredicate; @@ -25,76 +25,125 @@ import com.facebook.presto.spi.UpdatablePageSource; import com.google.common.collect.ImmutableList; import io.airlift.slice.Slice; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Types; import java.io.IOException; import java.io.UncheckedIOException; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.stream.IntStream; -import static com.facebook.presto.common.Utils.nativeValueToBlock; -import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY; +import static com.facebook.presto.common.block.ColumnarRow.toColumnarRow; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; -import static com.facebook.presto.iceberg.IcebergUtil.deserializePartitionValue; +import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_MISSING_COLUMN; import static com.google.common.base.Throwables.throwIfInstanceOf; +import static com.google.common.base.Verify.verify; +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static java.lang.String.format; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.function.UnaryOperator.identity; +/** + * Generates pages for an iceberg table while applying delete filters to rows + * _and_ by modifying the format of the output pages for update operators if + * required. + */ public class IcebergUpdateablePageSource implements UpdatablePageSource { - private final Block[] prefilledBlocks; - private final int[] delegateIndexes; private final ConnectorPageSource delegate; private final Supplier deleteSinkSupplier; - private IcebergDeletePageSink deleteSink; + private IcebergDeletePageSink positionDeleteSink; private final Supplier> deletePredicate; + private final List columns; + /** + * Columns actually updated in the query + */ + private final List updatedColumns; + private final Schema tableSchema; + private final Supplier updatedRowPageSinkSupplier; + private IcebergPageSink updatedRowPageSink; + // An array with one element per field in the $row_id column. The value in the array points to the + // channel where the data can be read from within the input page + private final int[] updateRowIdChildColumnIndexes; + // The $row_id's index in 'outputColumns', or -1 if there isn't one + private final int updateRowIdColumnIndex; + // Maps the Iceberg field ids of unmodified columns to their indexes in updateRowIdChildColumnIndexes + private final Map columnIdToRowIdColumnIndex = new HashMap<>(); + // Maps the Iceberg field ids of modified columns to their indexes in the updatedColumns columnValueAndRowIdChannels array + private final Map columnIdentityToUpdatedColumnIndex = new HashMap<>(); + private final int[] outputColumnToDelegateMapping; + public IcebergUpdateablePageSource( - List columns, + Schema tableSchema, + // represents the columns which need to be output from `getNextPage` + List outputColumns, Map metadataValues, Map partitionKeys, ConnectorPageSource delegate, + // represents the columns output by the delegate page source + List delegateColumns, Supplier deleteSinkSupplier, - Supplier> deletePredicate) + Supplier> deletePredicate, + Supplier updatedRowPageSinkSupplier, + // the columns that this page source is supposed to update + List updatedColumns, + Optional updateRowIdColumn) { - int size = requireNonNull(columns, "columns is null").size(); requireNonNull(partitionKeys, "partitionKeys is null"); + this.tableSchema = requireNonNull(tableSchema, "tableSchema is null"); + this.columns = requireNonNull(outputColumns, "columns is null"); this.delegate = requireNonNull(delegate, "delegate is null"); + // information for deletes this.deleteSinkSupplier = deleteSinkSupplier; - this.deletePredicate = requireNonNull(deletePredicate, "deletePredicate is null"); + // information for updates + this.updatedRowPageSinkSupplier = requireNonNull(updatedRowPageSinkSupplier, "updatedRowPageSinkSupplier is null"); + this.updatedColumns = requireNonNull(updatedColumns, "updatedColumns is null"); + this.outputColumnToDelegateMapping = new int[columns.size()]; + this.updateRowIdColumnIndex = updateRowIdColumn.map(columns::indexOf).orElse(-1); + this.updateRowIdChildColumnIndexes = updateRowIdColumn + .map(column -> new int[column.getColumnIdentity().getChildren().size()]) + .orElse(new int[0]); + Map columnToIndex = IntStream.range(0, delegateColumns.size()) + .boxed() + .collect(toImmutableMap(index -> delegateColumns.get(index).getColumnIdentity(), identity())); + updateRowIdColumn.ifPresent(column -> { + List rowIdFields = column.getColumnIdentity().getChildren(); + for (int i = 0; i < rowIdFields.size(); i++) { + ColumnIdentity columnIdentity = rowIdFields.get(i); + updateRowIdChildColumnIndexes[i] = requireNonNull(columnToIndex.get(columnIdentity), () -> format("Column %s not found in requiredColumns", columnIdentity)); + columnIdToRowIdColumnIndex.put(columnIdentity, i); + } + }); - prefilledBlocks = new Block[size]; - delegateIndexes = new int[size]; - - int outputIndex = 0; - int delegateIndex = 0; - for (IcebergColumnHandle column : columns) { - if (partitionKeys.containsKey(column.getId())) { - HivePartitionKey icebergPartition = partitionKeys.get(column.getId()); - Type type = column.getType(); - Object prefilledValue = deserializePartitionValue(type, icebergPartition.getValue().orElse(null), column.getName()); - prefilledBlocks[outputIndex] = nativeValueToBlock(type, prefilledValue); - delegateIndexes[outputIndex] = -1; + if (!updatedColumns.isEmpty()) { + for (int columnIndex = 0; columnIndex < updatedColumns.size(); columnIndex++) { + IcebergColumnHandle updatedColumn = updatedColumns.get(columnIndex); + columnIdentityToUpdatedColumnIndex.put(updatedColumn.getColumnIdentity(), columnIndex); } - else if (column.getColumnType() == PARTITION_KEY) { - // Partition key with no value. This can happen after partition evolution - Type type = column.getType(); - prefilledBlocks[outputIndex] = nativeValueToBlock(type, null); - delegateIndexes[outputIndex] = -1; + } + for (int i = 0; i < outputColumnToDelegateMapping.length; i++) { + if (outputColumns.get(i).isUpdateRowIdColumn()) { + continue; } - else if (IcebergMetadataColumn.isMetadataColumnId(column.getId())) { - prefilledBlocks[outputIndex] = nativeValueToBlock(column.getType(), metadataValues.get(column.getColumnIdentity().getId())); - delegateIndexes[outputIndex] = -1; + + if (!columnToIndex.containsKey(outputColumns.get(i).getColumnIdentity())) { + throw new PrestoException(ICEBERG_MISSING_COLUMN, format("Column %s not found in delegate column map", outputColumns.get(i))); } else { - delegateIndexes[outputIndex] = delegateIndex; - delegateIndex++; + outputColumnToDelegateMapping[i] = columnToIndex.get(outputColumns.get(i).getColumnIdentity()); } - outputIndex++; } } @@ -122,6 +171,16 @@ public boolean isFinished() return delegate.isFinished(); } + /** + * This method takes three steps in order to generate the final output page. + *
+ * 1. Retrieve rows from the delegate page source. Usually this is the source for the file + * format, such as {@link com.facebook.presto.hive.parquet.ParquetPageSource} or + * {@link IcebergPartitionInsertingPageSource}. + * 2. Using the newly retrieved page, apply any necessary delete filters. + * 3. Finally, take the necessary channels from the page with the delete filters applied and + * nest them into the updateRowId channel in {@link #setUpdateRowIdBlock(Page)} + */ @Override public Page getNextPage() { @@ -136,17 +195,7 @@ public Page getNextPage() dataPage = deleteFilterPredicate.get().filterPage(dataPage); } - int batchSize = dataPage.getPositionCount(); - Block[] blocks = new Block[prefilledBlocks.length]; - for (int i = 0; i < prefilledBlocks.length; i++) { - if (prefilledBlocks[i] != null) { - blocks[i] = new RunLengthEncodedBlock(prefilledBlocks[i], batchSize); - } - else { - blocks[i] = dataPage.getBlock(delegateIndexes[i]); - } - } - return new Page(batchSize, blocks); + return setUpdateRowIdBlock(dataPage); } catch (RuntimeException e) { closeWithSuppression(e); @@ -158,27 +207,108 @@ public Page getNextPage() @Override public void deleteRows(Block rowIds) { - if (deleteSink == null) { - deleteSink = deleteSinkSupplier.get(); + if (positionDeleteSink == null) { + positionDeleteSink = deleteSinkSupplier.get(); } - deleteSink.appendPage(new Page(rowIds)); + positionDeleteSink.appendPage(new Page(rowIds)); } @Override - public CompletableFuture> finish() + public void updateRows(Page page, List columnValueAndRowIdChannels) { - if (deleteSink == null) { - return CompletableFuture.completedFuture(ImmutableList.of()); + int rowIdChannel = columnValueAndRowIdChannels.get(columnValueAndRowIdChannels.size() - 1); + List columnChannelMapping = columnValueAndRowIdChannels.subList(0, columnValueAndRowIdChannels.size() - 1); + + if (positionDeleteSink == null) { + positionDeleteSink = deleteSinkSupplier.get(); + verify(positionDeleteSink != null); + } + if (updatedRowPageSink == null) { + updatedRowPageSink = updatedRowPageSinkSupplier.get(); + verify(updatedRowPageSink != null); + } + + ColumnarRow rowIdColumns = toColumnarRow(page.getBlock(rowIdChannel)); + positionDeleteSink.appendPage(new Page(rowIdColumns.getField(0))); + + Set updatedColumnFieldIds = columnIdentityToUpdatedColumnIndex.keySet(); + List tableColumns = tableSchema.columns(); + Block[] fullPage = new Block[tableColumns.size()]; + for (int targetChannel = 0; targetChannel < tableColumns.size(); targetChannel++) { + Types.NestedField column = tableColumns.get(targetChannel); + ColumnIdentity columnIdentity = ColumnIdentity.createColumnIdentity(column); + if (updatedColumnFieldIds.contains(columnIdentity)) { + fullPage[targetChannel] = page.getBlock(columnChannelMapping.get(columnIdentityToUpdatedColumnIndex.get(columnIdentity))); + } + else { + fullPage[targetChannel] = rowIdColumns.getField(columnIdToRowIdColumnIndex.get(columnIdentity)); + } } - return deleteSink.finish(); + updatedRowPageSink.appendPage(new Page(page.getPositionCount(), fullPage)); + } + + @Override + public CompletableFuture> finish() + { + return Optional.ofNullable(positionDeleteSink) + .map(IcebergDeletePageSink::finish) + .orElseGet(() -> completedFuture(ImmutableList.of())) + .thenCombine( + Optional.ofNullable(updatedRowPageSink).map(IcebergPageSink::finish) + .orElseGet(() -> completedFuture(ImmutableList.of())), + (positionDeletes, writtenFiles) -> ImmutableList.builder() + .addAll(positionDeletes) + .addAll(writtenFiles) + .build()); } @Override public void abort() { - if (deleteSink != null) { - deleteSink.abort(); + if (positionDeleteSink != null) { + positionDeleteSink.abort(); + } + + if (updatedRowPageSink != null) { + updatedRowPageSink.abort(); + } + } + + /** + * The $row_id column used for updates is a composite column of at least one other column in the Page. + * The indexes of the columns needed for the $row_id are in the updateRowIdChildColumnIndexes array. + * + * @param page The raw Page from the Parquet/ORC reader. + * @return A Page where the $row_id channel has been populated. + */ + private Page setUpdateRowIdBlock(Page page) + { + Block[] fullPage = new Block[columns.size()]; + Block[] rowIdFields; + Consumer loopFunc; + if (updateRowIdColumnIndex == -1 || updatedColumns.isEmpty()) { + loopFunc = (channel) -> fullPage[channel] = page.getBlock(outputColumnToDelegateMapping[channel]); } + else { + rowIdFields = new Block[updateRowIdChildColumnIndexes.length]; + for (int childIndex = 0; childIndex < updateRowIdChildColumnIndexes.length; childIndex++) { + rowIdFields[childIndex] = page.getBlock(updateRowIdChildColumnIndexes[childIndex]); + } + loopFunc = (channel) -> { + if (channel == updateRowIdColumnIndex) { + fullPage[channel] = RowBlock.fromFieldBlocks(page.getPositionCount(), Optional.empty(), rowIdFields); + } + else { + fullPage[channel] = page.getBlock(outputColumnToDelegateMapping[channel]); + } + }; + } + + for (int channel = 0; channel < columns.size(); channel++) { + loopFunc.accept(channel); + } + + return new Page(page.getPositionCount(), fullPage); } @Override @@ -202,8 +332,12 @@ public String toString() public long getSystemMemoryUsage() { long totalMemUsage = delegate.getSystemMemoryUsage(); - if (deleteSink != null) { - totalMemUsage += deleteSink.getSystemMemoryUsage(); + if (positionDeleteSink != null) { + totalMemUsage += positionDeleteSink.getSystemMemoryUsage(); + } + + if (updatedRowPageSink != null) { + totalMemUsage += updatedRowPageSink.getSystemMemoryUsage(); } return totalMemUsage; diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java index 19375667adfd2..d7b3c541135f1 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java @@ -196,6 +196,7 @@ import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.UPDATE_MODE; +import static org.apache.iceberg.TableProperties.UPDATE_MODE_DEFAULT; import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL; import static org.apache.iceberg.types.Type.TypeID.BINARY; import static org.apache.iceberg.types.Type.TypeID.FIXED; @@ -335,11 +336,19 @@ public static Map> getPartitionFields(PartitionSpec partiti } public static List getColumns(Schema schema, PartitionSpec partitionSpec, TypeManager typeManager) + { + return getColumns(schema.columns().stream().map(NestedField::fieldId), schema, partitionSpec, typeManager); + } + + public static List getColumns(Stream fields, Schema schema, PartitionSpec partitionSpec, TypeManager typeManager) { Set partitionFieldNames = getPartitionFields(partitionSpec, IDENTITY).keySet(); - return schema.columns().stream() - .map(column -> partitionFieldNames.contains(column.name()) ? IcebergColumnHandle.create(column, typeManager, PARTITION_KEY) : IcebergColumnHandle.create(column, typeManager, REGULAR)) + return fields + .map(schema::findField) + .map(column -> partitionFieldNames.contains(column.name()) ? + IcebergColumnHandle.create(column, typeManager, PARTITION_KEY) : + IcebergColumnHandle.create(column, typeManager, REGULAR)) .collect(toImmutableList()); } @@ -1150,10 +1159,13 @@ public static Map populateTableProperties(ConnectorTableMetadata if (parseFormatVersion(formatVersion) < MIN_FORMAT_VERSION_FOR_DELETE) { propertiesBuilder.put(DELETE_MODE, RowLevelOperationMode.COPY_ON_WRITE.modeName()); + propertiesBuilder.put(UPDATE_MODE, RowLevelOperationMode.COPY_ON_WRITE.modeName()); } else { RowLevelOperationMode deleteMode = IcebergTableProperties.getDeleteMode(tableMetadata.getProperties()); propertiesBuilder.put(DELETE_MODE, deleteMode.modeName()); + RowLevelOperationMode updateMode = IcebergTableProperties.getUpdateMode(tableMetadata.getProperties()); + propertiesBuilder.put(UPDATE_MODE, updateMode.modeName()); } Integer metadataPreviousVersionsMax = IcebergTableProperties.getMetadataPreviousVersionsMax(tableMetadata.getProperties()); @@ -1184,6 +1196,13 @@ public static RowLevelOperationMode getDeleteMode(Table table) .toUpperCase(Locale.ENGLISH)); } + public static RowLevelOperationMode getUpdateMode(Table table) + { + return RowLevelOperationMode.fromName(table.properties() + .getOrDefault(UPDATE_MODE, UPDATE_MODE_DEFAULT) + .toUpperCase(Locale.ENGLISH)); + } + public static int getMetadataPreviousVersionsMax(Table table) { return Integer.parseInt(table.properties() diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/delete/IcebergDeletePageSink.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/delete/IcebergDeletePageSink.java index 0a9043684ba8f..ac7ee1aa9bbcf 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/delete/IcebergDeletePageSink.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/delete/IcebergDeletePageSink.java @@ -46,6 +46,7 @@ import static com.facebook.presto.common.Utils.nativeValueToBlock; import static com.facebook.presto.hive.util.ConfigurationUtils.toJobConf; +import static com.facebook.presto.iceberg.FileContent.POSITION_DELETES; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_ROLLBACK_ERROR; import static com.facebook.presto.iceberg.IcebergUtil.partitionDataFromJson; @@ -149,7 +150,8 @@ public CompletableFuture> finish() partitionSpec.specId(), partitionData.map(PartitionData::toJson), fileFormat, - dataFile); + dataFile, + POSITION_DELETES); commitTasks.add(wrappedBuffer(jsonCodec.toJsonBytes(task))); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergEqualityDeleteAsJoin.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergEqualityDeleteAsJoin.java index 991c77bc6053d..18635bb4ce806 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergEqualityDeleteAsJoin.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergEqualityDeleteAsJoin.java @@ -59,7 +59,6 @@ import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; -import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Table; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.types.Type; @@ -331,10 +330,11 @@ private TableScanNode createDeletesTableScan(ImmutableMap assignmentsBuilder = ImmutableMap.builder() diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java index a308202869338..da2c1c41a5a40 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java @@ -123,6 +123,7 @@ public void testDescribeTable() MaterializedResult actualColumns = computeActual("DESCRIBE orders"); Assert.assertEquals(actualColumns, expectedColumns); } + @Test public void testShowCreateTable() { @@ -146,7 +147,8 @@ public void testShowCreateTable() " location = '%s',\n" + " metadata_delete_after_commit = false,\n" + " metadata_previous_versions_max = 100,\n" + - " metrics_max_inferred_column = 100\n" + + " metrics_max_inferred_column = 100,\n" + + " \"write.update.mode\" = 'merge-on-read'\n" + ")", schemaName, getLocation(schemaName, "orders"))); } @@ -427,7 +429,8 @@ protected void testCreatePartitionedTableAs(Session session, FileFormat fileForm " metadata_delete_after_commit = false,\n" + " metadata_previous_versions_max = 100,\n" + " metrics_max_inferred_column = 100,\n" + - " partitioning = ARRAY['order_status','ship_priority','bucket(order_key, 9)']\n" + + " partitioning = ARRAY['order_status','ship_priority','bucket(order_key, 9)'],\n" + + " \"write.update.mode\" = 'merge-on-read'\n" + ")", getSession().getCatalog().get(), getSession().getSchema().get(), @@ -630,7 +633,8 @@ public void testTableComments() " location = '%s',\n" + " metadata_delete_after_commit = false,\n" + " metadata_previous_versions_max = 100,\n" + - " metrics_max_inferred_column = 100\n" + + " metrics_max_inferred_column = 100,\n" + + " \"write.update.mode\" = 'merge-on-read'\n" + ")"; String createTableSql = format(createTableTemplate, schemaName, "test table comment", getLocation(schemaName, "test_table_comments")); @@ -722,7 +726,8 @@ private void testCreateTableLike() " metadata_delete_after_commit = false,\n" + " metadata_previous_versions_max = 100,\n" + " metrics_max_inferred_column = 100,\n" + - " partitioning = ARRAY['adate']\n" + + " partitioning = ARRAY['adate'],\n" + + " \"write.update.mode\" = 'merge-on-read'\n" + ")", getLocation(schemaName, "test_create_table_like_original"))); assertUpdate(session, "CREATE TABLE test_create_table_like_copy0 (LIKE test_create_table_like_original, col2 INTEGER)"); @@ -738,7 +743,8 @@ private void testCreateTableLike() " location = '%s',\n" + " metadata_delete_after_commit = false,\n" + " metadata_previous_versions_max = 100,\n" + - " metrics_max_inferred_column = 100\n" + + " metrics_max_inferred_column = 100,\n" + + " \"write.update.mode\" = 'merge-on-read'\n" + ")", getLocation(schemaName, "test_create_table_like_copy1"))); dropTable(session, "test_create_table_like_copy1"); @@ -750,7 +756,8 @@ private void testCreateTableLike() " location = '%s',\n" + " metadata_delete_after_commit = false,\n" + " metadata_previous_versions_max = 100,\n" + - " metrics_max_inferred_column = 100\n" + + " metrics_max_inferred_column = 100,\n" + + " \"write.update.mode\" = 'merge-on-read'\n" + ")", getLocation(schemaName, "test_create_table_like_copy2"))); dropTable(session, "test_create_table_like_copy2"); @@ -764,22 +771,24 @@ private void testCreateTableLike() " metadata_delete_after_commit = false,\n" + " metadata_previous_versions_max = 100,\n" + " metrics_max_inferred_column = 100,\n" + - " partitioning = ARRAY['adate']\n" + + " partitioning = ARRAY['adate'],\n" + + " \"write.update.mode\" = 'merge-on-read'\n" + ")", getLocation(schemaName, "test_create_table_like_original"))); dropTable(session, "test_create_table_like_copy3"); assertUpdate(session, "CREATE TABLE test_create_table_like_copy4 (LIKE test_create_table_like_original INCLUDING PROPERTIES) WITH (format = 'ORC')"); assertEquals(getTablePropertiesString("test_create_table_like_copy4"), format("WITH (\n" + - " delete_mode = 'merge-on-read',\n" + - " format = 'ORC',\n" + - " format_version = '2',\n" + - " location = '%s',\n" + - " metadata_delete_after_commit = false,\n" + - " metadata_previous_versions_max = 100,\n" + - " metrics_max_inferred_column = 100,\n" + - " partitioning = ARRAY['adate']\n" + - ")", + " delete_mode = 'merge-on-read',\n" + + " format = 'ORC',\n" + + " format_version = '2',\n" + + " location = '%s',\n" + + " metadata_delete_after_commit = false,\n" + + " metadata_previous_versions_max = 100,\n" + + " metrics_max_inferred_column = 100,\n" + + " partitioning = ARRAY['adate'],\n" + + " \"write.update.mode\" = 'merge-on-read'\n" + + ")", getLocation(schemaName, "test_create_table_like_original"))); dropTable(session, "test_create_table_like_copy4"); } @@ -794,7 +803,8 @@ private void testCreateTableLike() " metadata_delete_after_commit = false,\n" + " metadata_previous_versions_max = 100,\n" + " metrics_max_inferred_column = 100,\n" + - " partitioning = ARRAY['adate']\n" + + " partitioning = ARRAY['adate'],\n" + + " \"write.update.mode\" = 'merge-on-read'\n" + ")", getLocation(schemaName, "test_create_table_like_copy5"))); dropTable(session, "test_create_table_like_copy5"); @@ -841,14 +851,16 @@ protected void testCreateTableWithFormatVersion(String formatVersion, String def " location = '%s',\n" + " metadata_delete_after_commit = false,\n" + " metadata_previous_versions_max = 100,\n" + - " metrics_max_inferred_column = 100\n" + + " metrics_max_inferred_column = 100,\n" + + " \"write.update.mode\" = '%s'\n" + ")", getSession().getCatalog().get(), getSession().getSchema().get(), "test_create_table_with_format_version_" + formatVersion, defaultDeleteMode, formatVersion, - getLocation(getSession().getSchema().get(), "test_create_table_with_format_version_" + formatVersion)); + getLocation(getSession().getSchema().get(), "test_create_table_with_format_version_" + formatVersion), + defaultDeleteMode); MaterializedResult actualResult = computeActual("SHOW CREATE TABLE test_create_table_with_format_version_" + formatVersion); assertEquals(getOnlyElement(actualResult.getOnlyColumnAsSet()), createTableSql); diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index d3a4c74e1c67b..64736be5edc56 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -2313,6 +2313,97 @@ public void testInformationSchemaQueries() assertQuerySucceeds("DROP SCHEMA ICEBERG.TEST_SCHEMA2"); } + @Test + public void testUpdateWithPredicates() + { + String tableName = "test_update_predicates_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + "(id int, full_name varchar(20))"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'aaaa'), (2, 'bbbb'), (3, 'cccc')", 3); + // update single row on id + assertUpdate("UPDATE " + tableName + " SET full_name = 'aaaa AAAA' WHERE id = 1", 1); + assertQuery("SELECT id, full_name FROM " + tableName, "VALUES (1, 'aaaa AAAA'), (2, 'bbbb'), (3, 'cccc')"); + + // update single row with compound predicate + assertUpdate("UPDATE " + tableName + " SET full_name = 'aaaa' WHERE id = 1 and full_name='aaaa AAAA'", 1); + assertQuery("SELECT id, full_name FROM " + tableName, "VALUES (1, 'aaaa'), (2, 'bbbb'), (3, 'cccc')"); + + // update multiple rows at once + assertUpdate("UPDATE " + tableName + " SET full_name = 'ssss' WHERE id != 1 ", 2); + assertQuery("SELECT id, full_name FROM " + tableName, "VALUES (1, 'aaaa'), (2, 'ssss'), (3, 'ssss')"); + + // update with filter matching no rows + assertUpdate("UPDATE " + tableName + " SET full_name = 'ssss' WHERE id > 4 ", 0); + assertQuery("SELECT id, full_name FROM " + tableName, "VALUES (1, 'aaaa'), (2, 'ssss'), (3, 'ssss')"); + + // add column and update null values + assertUpdate("ALTER TABLE " + tableName + " ADD column email varchar"); + assertQuery("SELECT id, full_name, email FROM " + tableName, "VALUES (1, 'aaaa', NULL), (2, 'ssss', NULL), (3, 'ssss', NULL)"); + assertUpdate("INSERT INTO " + tableName + " VALUES (4, 'dddd', 'ddd@gmail.com')", 1); + assertUpdate("UPDATE " + tableName + " SET email = 'abc@gmail.com' WHERE id in(1, 2, 3)", 3); + assertQuery("SELECT id, full_name, email FROM " + tableName, "VALUES (1, 'aaaa', 'abc@gmail.com'), (2, 'ssss', 'abc@gmail.com'), (3, 'ssss', 'abc@gmail.com'), (4, 'dddd', 'ddd@gmail.com') "); + + // set all values to null + assertUpdate("UPDATE " + tableName + " SET email = NULL", 4); + assertQuery("SELECT email FROM " + tableName + " WHERE email is NULL", "VALUES NULL, NULL, NULL, NULL"); + + // update nulls to non-null + assertUpdate("UPDATE " + tableName + " SET email = 'test@gmail.com' WHERE email is NULL", 4); + assertQuery("SELECT count(*) FROM " + tableName + " WHERE email is not NULL", "VALUES 4"); + } + + @Test + public void testUpdateAllValues() + { + String tableName = "test_update_all_values_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + "(a int, b int, c int)"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, 2, 3), (11, 12, 13), (21, 22, 23)", 3); + assertUpdate("UPDATE " + tableName + " SET a = a + 1, b = b - 1, c = c * 2", 3); + assertQuery("SELECT * FROM " + tableName, "VALUES (2, 1, 6), (12, 11, 26), (22, 21, 46)"); + + // update multiple columns with predicate + assertUpdate("UPDATE " + tableName + " SET a = a + 1, b = b - 1, c = c * 2 WHERE a = 2 AND b = 1", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (3, 0, 12), (12, 11, 26), (22, 21, 46)"); + } + + @Test + public void testUpdateRowType() + { + String tableName = "test_update_row_type" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + "(int_t INT, row_t ROW(f1 INT, f2 INT))"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, ROW(2, 3)), (11, ROW(12, 13)), (21, ROW(22, 23))", 3); + assertUpdate("UPDATE " + tableName + " SET int_t = int_t - 1 WHERE row_t.f2 = 3", 1); + assertQuery("SELECT int_t, row_t.f1, row_t.f2 FROM " + tableName, "VALUES (0, 2, 3), (11, 12, 13), (21, 22, 23)"); + assertUpdate("UPDATE " + tableName + " SET row_t = ROW(row_t.f1, row_t.f2 + 1) WHERE int_t = 11", 1); + assertQuery("SELECT int_t, row_t.f1, row_t.f2 FROM " + tableName, "VALUES (0, 2, 3), (11, 12, 14), (21, 22, 23)"); + assertUpdate("UPDATE " + tableName + " SET row_t = ROW(row_t.f1 * 2, row_t.f2) WHERE row_t.f1 = 22", 1); + assertQuery("SELECT int_t, row_t.f1, row_t.f2 FROM " + tableName, "VALUES (0, 2, 3), (11, 12, 14), (21, 44, 23)"); + } + + public void testUpdateOnPartitionTable() + { + String tableName = "test_update_partition_column_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + "(a int, b varchar(10))" + "with(partitioning=ARRAY['a'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'first'), (1, 'second'), (2, 'third')", 3); + // update all rows on partition column + assertUpdate("UPDATE " + tableName + " SET a = a + 1", 3); + assertQuery("SELECT a, b FROM " + tableName, "VALUES (2,'first'), (2,'second'), (3,'third')"); + + assertUpdate("UPDATE " + tableName + " SET a = a + (CASE b WHEN 'first' THEN 1 ELSE 0 END)", 3); + assertQuery("SELECT a, b FROM " + tableName, "VALUES (3,'first'), (2,'second'), (3,'third')"); + + // update on partition column with predicate + assertUpdate("UPDATE " + tableName + " SET a = a + 1 WHERE b = 'second'", 1); + assertQuery("SELECT a, b FROM " + tableName, "VALUES (3,'first'), (3,'second'), (3,'third')"); + + // update non-partition column on a partitioned table without a predicate + assertUpdate("UPDATE " + tableName + " SET a = a + 1 WHERE b = 'second'", 1); + assertQuery("SELECT a, b FROM " + tableName, "VALUES (3,'first'), (4,'second'), (3,'third')"); + + // update non-partition column on a partitioned table with a predicate + assertUpdate("UPDATE " + tableName + " SET b = CONCAT(CAST(a as varchar), CASE a WHEN 1 THEN 'st' WHEN 2 THEN 'nd' WHEN 3 THEN 'rd' ELSE 'th' END) WHERE b = 'second'", 1); + assertQuery("SELECT a, b FROM " + tableName, "VALUES (3,'first'), (4,'4th'), (3,'third')"); + } + private void testCheckDeleteFiles(Table icebergTable, int expectedSize, List expectedFileContent) { // check delete file list diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSystemTables.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSystemTables.java index 15d09ddfd9bc0..271137a0e52f8 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSystemTables.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSystemTables.java @@ -164,7 +164,7 @@ public void testSnapshotsTable() "('manifest_list', 'varchar', '', '')," + "('summary', 'map(varchar, varchar)', '', '')"); - assertQuery("SELECT operation FROM test_schema.\"test_table$snapshots\"", "VALUES 'append', 'append'"); + assertQuery("SELECT operation FROM test_schema.\"test_table$snapshots\"", "VALUES 'overwrite', 'overwrite'"); assertQuery("SELECT summary['total-records'] FROM test_schema.\"test_table$snapshots\"", "VALUES '3', '6'"); } @@ -258,14 +258,16 @@ protected void checkTableProperties(String tableName, String deleteMode) { assertQuery(String.format("SHOW COLUMNS FROM test_schema.\"%s$properties\"", tableName), "VALUES ('key', 'varchar', '', '')," + "('value', 'varchar', '', '')"); - assertQuery(String.format("SELECT COUNT(*) FROM test_schema.\"%s$properties\"", tableName), "VALUES 7"); + assertQuery(String.format("SELECT COUNT(*) FROM test_schema.\"%s$properties\"", tableName), "VALUES 8"); List materializedRows = computeActual(getSession(), String.format("SELECT * FROM test_schema.\"%s$properties\"", tableName)).getMaterializedRows(); - assertThat(materializedRows).hasSize(7); + assertThat(materializedRows).hasSize(8); assertThat(materializedRows) .anySatisfy(row -> assertThat(row) .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.delete.mode", deleteMode))) + .anySatisfy(row -> assertThat(row) + .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.update.mode", deleteMode))) .anySatisfy(row -> assertThat(row) .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.format.default", "PARQUET"))) .anySatisfy(row -> assertThat(row) @@ -284,14 +286,16 @@ protected void checkORCFormatTableProperties(String tableName, String deleteMode { assertQuery(String.format("SHOW COLUMNS FROM test_schema.\"%s$properties\"", tableName), "VALUES ('key', 'varchar', '', '')," + "('value', 'varchar', '', '')"); - assertQuery(String.format("SELECT COUNT(*) FROM test_schema.\"%s$properties\"", tableName), "VALUES 8"); + assertQuery(String.format("SELECT COUNT(*) FROM test_schema.\"%s$properties\"", tableName), "VALUES 9"); List materializedRows = computeActual(getSession(), String.format("SELECT * FROM test_schema.\"%s$properties\"", tableName)).getMaterializedRows(); - assertThat(materializedRows).hasSize(8); + assertThat(materializedRows).hasSize(9); assertThat(materializedRows) .anySatisfy(row -> assertThat(row) .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.delete.mode", deleteMode))) + .anySatisfy(row -> assertThat(row) + .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.update.mode", deleteMode))) .anySatisfy(row -> assertThat(row) .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.format.default", "ORC"))) .anySatisfy(row -> assertThat(row) diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestStatisticsUtil.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestStatisticsUtil.java index 2a7c64f23a7a3..6d894ddb934c2 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestStatisticsUtil.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestStatisticsUtil.java @@ -287,7 +287,7 @@ public void testGenerateStatisticColumnSets() .setDataColumns(ImmutableList.of()) .setPredicateColumns(ImmutableMap.of()) .setRequestedColumns(Optional.empty()) - .setTable(new IcebergTableHandle("test", IcebergTableName.from("test"), false, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Collections.emptyList())) + .setTable(new IcebergTableHandle("test", IcebergTableName.from("test"), false, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), ImmutableList.of(), ImmutableList.of())) .setDomainPredicate(TupleDomain.all()); // verify all selected columns are included List includedColumns = combineSelectedAndPredicateColumns( diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSystemTablesNessie.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSystemTablesNessie.java index 1ddc4c3235e48..5f606777596d0 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSystemTablesNessie.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSystemTablesNessie.java @@ -89,11 +89,11 @@ protected void checkTableProperties(String tableName, String deleteMode) { assertQuery(String.format("SHOW COLUMNS FROM test_schema.\"%s$properties\"", tableName), "VALUES ('key', 'varchar', '', '')," + "('value', 'varchar', '', '')"); - assertQuery(String.format("SELECT COUNT(*) FROM test_schema.\"%s$properties\"", tableName), "VALUES 9"); + assertQuery(String.format("SELECT COUNT(*) FROM test_schema.\"%s$properties\"", tableName), "VALUES 10"); List materializedRows = computeActual(getSession(), String.format("SELECT * FROM test_schema.\"%s$properties\"", tableName)).getMaterializedRows(); - assertThat(materializedRows).hasSize(9); + assertThat(materializedRows).hasSize(10); assertThat(materializedRows) .anySatisfy(row -> assertThat(row) .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.delete.mode", deleteMode))) @@ -118,11 +118,11 @@ protected void checkORCFormatTableProperties(String tableName, String deleteMode { assertQuery(String.format("SHOW COLUMNS FROM test_schema.\"%s$properties\"", tableName), "VALUES ('key', 'varchar', '', '')," + "('value', 'varchar', '', '')"); - assertQuery(String.format("SELECT COUNT(*) FROM test_schema.\"%s$properties\"", tableName), "VALUES 10"); + assertQuery(String.format("SELECT COUNT(*) FROM test_schema.\"%s$properties\"", tableName), "VALUES 11"); List materializedRows = computeActual(getSession(), String.format("SELECT * FROM test_schema.\"%s$properties\"", tableName)).getMaterializedRows(); - assertThat(materializedRows).hasSize(10); + assertThat(materializedRows).hasSize(11); assertThat(materializedRows) .anySatisfy(row -> assertThat(row) .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.delete.mode", deleteMode))) diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestSetTablePropertyProcedure.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestSetTablePropertyProcedure.java index a6ff137f8bc32..ef809bbcf725d 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestSetTablePropertyProcedure.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestSetTablePropertyProcedure.java @@ -71,14 +71,14 @@ public void testSetTablePropertyProcedurePositionalArgs() Table table = loadTable(tableName); table.refresh(); - assertEquals(table.properties().size(), 7); + assertEquals(table.properties().size(), 8); assertEquals(table.properties().get(propertyKey), null); assertUpdate(format("CALL system.set_table_property('%s', '%s', '%s', '%s')", TEST_SCHEMA, tableName, propertyKey, propertyValue)); table.refresh(); // now the table property read.split.target-size should have new value - assertEquals(table.properties().size(), 8); + assertEquals(table.properties().size(), 9); assertEquals(table.properties().get(propertyKey), propertyValue); } finally { @@ -99,7 +99,7 @@ public void testSetTablePropertyProcedureNamedArgs() Table table = loadTable(tableName); table.refresh(); - assertEquals(table.properties().size(), 7); + assertEquals(table.properties().size(), 8); assertEquals(table.properties().get(propertyKey), null); assertUpdate(format("CALL system.set_table_property(schema => '%s', key => '%s', value => '%s', table_name => '%s')", @@ -107,7 +107,7 @@ public void testSetTablePropertyProcedureNamedArgs() table.refresh(); // now the table property read.split.target-size should have new value - assertEquals(table.properties().size(), 8); + assertEquals(table.properties().size(), 9); assertEquals(table.properties().get(propertyKey), propertyValue); } finally { @@ -129,14 +129,14 @@ public void testSetTablePropertyProcedureUpdateExisting() Table table = loadTable(tableName); table.refresh(); - assertEquals(table.properties().size(), 7); + assertEquals(table.properties().size(), 8); assertEquals(table.properties().get(propertyKey), "4"); assertUpdate(format("CALL system.set_table_property('%s', '%s', '%s', '%s')", TEST_SCHEMA, tableName, propertyKey, propertyValue)); table.refresh(); // now the table property commit.retry.num-retries should have new value - assertEquals(table.properties().size(), 7); + assertEquals(table.properties().size(), 8); assertEquals(table.properties().get(propertyKey), propertyValue); } finally { diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRestNestedNamespace.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRestNestedNamespace.java index 1613bcfd9ba10..0cfa1b751bb50 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRestNestedNamespace.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRestNestedNamespace.java @@ -180,7 +180,8 @@ public void testShowCreateTable() " location = '%s',\n" + " metadata_delete_after_commit = false,\n" + " metadata_previous_versions_max = 100,\n" + - " metrics_max_inferred_column = 100\n" + + " metrics_max_inferred_column = 100,\n" + + " \"write.update.mode\" = 'merge-on-read'\n" + ")", schemaName, getLocation(schemaName, "orders"))); } @@ -215,7 +216,8 @@ public void testTableComments() " location = '%s',\n" + " metadata_delete_after_commit = false,\n" + " metadata_previous_versions_max = 100,\n" + - " metrics_max_inferred_column = 100\n" + + " metrics_max_inferred_column = 100,\n" + + " \"write.update.mode\" = 'merge-on-read'\n" + ")"; String createTableSql = format(createTableTemplate, schemaName, "test table comment", getLocation(schemaName, "test_table_comments")); @@ -255,7 +257,8 @@ protected void testCreatePartitionedTableAs(Session session, FileFormat fileForm " metadata_delete_after_commit = false,\n" + " metadata_previous_versions_max = 100,\n" + " metrics_max_inferred_column = 100,\n" + - " partitioning = ARRAY['order_status','ship_priority','bucket(order_key, 9)']\n" + + " partitioning = ARRAY['order_status','ship_priority','bucket(order_key, 9)'],\n" + + " \"write.update.mode\" = 'merge-on-read'\n" + ")", getSession().getCatalog().get(), getSession().getSchema().get(), @@ -315,14 +318,16 @@ protected void testCreateTableWithFormatVersion(String formatVersion, String def " location = '%s',\n" + " metadata_delete_after_commit = false,\n" + " metadata_previous_versions_max = 100,\n" + - " metrics_max_inferred_column = 100\n" + + " metrics_max_inferred_column = 100,\n" + + " \"write.update.mode\" = '%s'\n" + ")", getSession().getCatalog().get(), getSession().getSchema().get(), "test_create_table_with_format_version_" + formatVersion, defaultDeleteMode, formatVersion, - getLocation(getSession().getSchema().get(), "test_create_table_with_format_version_" + formatVersion)); + getLocation(getSession().getSchema().get(), "test_create_table_with_format_version_" + formatVersion), + defaultDeleteMode); MaterializedResult actualResult = computeActual("SHOW CREATE TABLE test_create_table_with_format_version_" + formatVersion); assertEquals(getOnlyElement(actualResult.getOnlyColumnAsSet()), createTableSql); diff --git a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/TableWriteInfo.java b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/TableWriteInfo.java index d769550178449..6cdc1c318b08a 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/TableWriteInfo.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/TableWriteInfo.java @@ -106,6 +106,10 @@ private static Optional createWriterTarget(Optional updatedColumnNames.contains(entry.getKey())) .map(Map.Entry::getValue) .collect(toImmutableList()); + handle = metadata.beginUpdate(session, handle, updatedColumns); ColumnHandle rowIdHandle = metadata.getUpdateRowIdColumnHandle(session, handle, updatedColumns); Type rowIdType = metadata.getColumnMetadata(session, handle, rowIdHandle).getType(); @@ -324,12 +325,12 @@ public UpdateNode plan(Update node) ImmutableList.Builder orderedColumnValuesBuilder = ImmutableList.builder(); for (Field field : descriptor.getAllFields()) { String name = field.getName().get(); + VariableReferenceExpression variable = variableAllocator.newVariable(getSourceLocation(field.getNodeLocation()), name, field.getType()); + outputVariablesBuilder.add(variable); + columns.put(variable, analysis.getColumn(field)); + fields.add(field); int index = targetColumnNames.indexOf(name); if (index >= 0) { - VariableReferenceExpression variable = variableAllocator.newVariable(getSourceLocation(field.getNodeLocation()), field.getName().get(), field.getType()); - outputVariablesBuilder.add(variable); - columns.put(variable, analysis.getColumn(field)); - fields.add(field); orderedColumnValuesBuilder.add(node.getAssignments().get(index).getValue()); } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/SplitSourceFactory.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/SplitSourceFactory.java index 2396eb32b8265..c4707b487744e 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/SplitSourceFactory.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/SplitSourceFactory.java @@ -61,6 +61,7 @@ import com.facebook.presto.sql.planner.plan.TableWriterMergeNode; import com.facebook.presto.sql.planner.plan.TopNRowNumberNode; import com.facebook.presto.sql.planner.plan.UnnestNode; +import com.facebook.presto.sql.planner.plan.UpdateNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -378,6 +379,12 @@ public Map visitDelete(DeleteNode node, Context context return node.getSource().accept(this, context); } + @Override + public Map visitUpdate(UpdateNode node, Context context) + { + return node.getSource().accept(this, context); + } + @Override public Map visitMetadataDelete(MetadataDeleteNode node, Context context) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PruneUpdateSourceColumns.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PruneUpdateSourceColumns.java new file mode 100644 index 0000000000000..28e72617dec51 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PruneUpdateSourceColumns.java @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sql.planner.iterative.rule; + +import com.facebook.presto.matching.Captures; +import com.facebook.presto.matching.Pattern; +import com.facebook.presto.sql.planner.iterative.Rule; +import com.facebook.presto.sql.planner.plan.UpdateNode; +import com.google.common.collect.ImmutableSet; + +import static com.facebook.presto.sql.planner.iterative.rule.Util.restrictChildOutputs; +import static com.facebook.presto.sql.planner.plan.Patterns.update; + +public class PruneUpdateSourceColumns + implements Rule +{ + private static final Pattern PATTERN = update(); + + @Override + public Pattern getPattern() + { + return PATTERN; + } + + @Override + public Result apply(UpdateNode updateNode, Captures captures, Context context) + { + return restrictChildOutputs(context.getIdAllocator(), updateNode, ImmutableSet.copyOf(updateNode.getColumnValueAndRowIdSymbols())) + .map(Result::ofPlanNode) + .orElse(Result.empty()); + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PropertyDerivations.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PropertyDerivations.java index 0cacb674285ee..71e64a09370a5 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PropertyDerivations.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PropertyDerivations.java @@ -70,6 +70,7 @@ import com.facebook.presto.sql.planner.plan.TableWriterMergeNode; import com.facebook.presto.sql.planner.plan.TopNRowNumberNode; import com.facebook.presto.sql.planner.plan.UnnestNode; +import com.facebook.presto.sql.planner.plan.UpdateNode; import com.facebook.presto.sql.relational.RowExpressionDomainTranslator; import com.google.common.collect.ImmutableBiMap; import com.google.common.collect.ImmutableList; @@ -384,6 +385,12 @@ public ActualProperties visitDelete(DeleteNode node, List inpu return Iterables.getOnlyElement(inputProperties).translateVariable(symbol -> Optional.empty()); } + @Override + public ActualProperties visitUpdate(UpdateNode node, List inputProperties) + { + return Iterables.getOnlyElement(inputProperties).translateVariable(symbol -> Optional.empty()); + } + @Override public ActualProperties visitJoin(JoinNode node, List inputProperties) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java index 2d8e8e5737a0f..921bb4ec3a2a7 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java @@ -67,6 +67,7 @@ import com.facebook.presto.sql.planner.plan.TableWriterMergeNode; import com.facebook.presto.sql.planner.plan.TopNRowNumberNode; import com.facebook.presto.sql.planner.plan.UnnestNode; +import com.facebook.presto.sql.planner.plan.UpdateNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.ImmutableMap; @@ -813,6 +814,12 @@ public PlanNode visitDelete(DeleteNode node, RewriteContext> context) + { + return new UpdateNode(node.getSourceLocation(), node.getId(), node.getSource(), node.getRowId(), node.getColumnValueAndRowIdSymbols(), node.getOutputVariables()); + } + @Override public PlanNode visitUnion(UnionNode node, RewriteContext> context) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPropertyDerivations.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPropertyDerivations.java index 8dbaa277c1e19..24e29dfd57c81 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPropertyDerivations.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPropertyDerivations.java @@ -59,6 +59,7 @@ import com.facebook.presto.sql.planner.plan.TableWriterMergeNode; import com.facebook.presto.sql.planner.plan.TopNRowNumberNode; import com.facebook.presto.sql.planner.plan.UnnestNode; +import com.facebook.presto.sql.planner.plan.UpdateNode; import com.google.common.collect.ImmutableBiMap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -450,6 +451,13 @@ public StreamProperties visitTableWriter(TableWriterNode node, List inputProperties) + { + StreamProperties properties = Iterables.getOnlyElement(inputProperties); + return properties.withUnspecifiedPartitioning(); + } + @Override public StreamProperties visitTableWriteMerge(TableWriterMergeNode node, List inputProperties) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/UnaliasSymbolReferences.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/UnaliasSymbolReferences.java index 9ff8ce55cfedf..6344ebffac9bc 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/UnaliasSymbolReferences.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/UnaliasSymbolReferences.java @@ -74,6 +74,7 @@ import com.facebook.presto.sql.planner.plan.TableWriterMergeNode; import com.facebook.presto.sql.planner.plan.TopNRowNumberNode; import com.facebook.presto.sql.planner.plan.UnnestNode; +import com.facebook.presto.sql.planner.plan.UpdateNode; import com.facebook.presto.sql.relational.RowExpressionDeterminismEvaluator; import com.facebook.presto.sql.tree.SymbolReference; import com.google.common.base.Preconditions; @@ -452,6 +453,12 @@ public PlanNode visitDelete(DeleteNode node, RewriteContext context) return new DeleteNode(node.getSourceLocation(), node.getId(), context.rewrite(node.getSource()), canonicalize(node.getRowId()), node.getOutputVariables(), node.getInputDistribution()); } + @Override + public PlanNode visitUpdate(UpdateNode node, RewriteContext context) + { + return new UpdateNode(node.getSourceLocation(), node.getId(), node.getSource(), canonicalize(node.getRowId()), node.getColumnValueAndRowIdSymbols(), node.getOutputVariables()); + } + @Override public PlanNode visitStatisticsWriterNode(StatisticsWriterNode node, RewriteContext context) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/Patterns.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/Patterns.java index df817f60799ec..8bc7a49cc1500 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/Patterns.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/Patterns.java @@ -77,6 +77,10 @@ public static Pattern delete() { return typeOf(DeleteNode.class); } + public static Pattern update() + { + return typeOf(UpdateNode.class); + } public static Pattern exchange() { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/UpdateNode.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/UpdateNode.java index 1b2e8c89e21d4..7a50e3aa0da05 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/UpdateNode.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/UpdateNode.java @@ -91,6 +91,7 @@ public List getSources() return ImmutableList.of(source); } + @JsonProperty @Override public List getOutputVariables() { @@ -112,6 +113,6 @@ public PlanNode assignStatsEquivalentPlanNode(Optional statsEquivalent @Override public PlanNode replaceChildren(List newChildren) { - return new UpdateNode(getSourceLocation(), getId(), Optional.of(Iterables.getOnlyElement(newChildren)), source, rowId, columnValueAndRowIdSymbols, outputVariables); + return new UpdateNode(getSourceLocation(), getId(), getStatsEquivalentPlanNode(), Iterables.getOnlyElement(newChildren), rowId, columnValueAndRowIdSymbols, outputVariables); } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java index fe229ddc88b92..cb348844589b1 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java @@ -98,6 +98,7 @@ import com.facebook.presto.sql.planner.plan.TableWriterMergeNode; import com.facebook.presto.sql.planner.plan.TopNRowNumberNode; import com.facebook.presto.sql.planner.plan.UnnestNode; +import com.facebook.presto.sql.planner.plan.UpdateNode; import com.facebook.presto.sql.relational.FunctionResolution; import com.facebook.presto.sql.relational.RowExpressionDeterminismEvaluator; import com.facebook.presto.sql.tree.ComparisonExpression; @@ -1241,6 +1242,14 @@ public Void visitDelete(DeleteNode node, Void context) return processChildren(node, context); } + @Override + public Void visitUpdate(UpdateNode node, Void context) + { + addNode(node, "Update"); + + return processChildren(node, context); + } + @Override public Void visitMetadataDelete(MetadataDeleteNode node, Void context) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateDependenciesChecker.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateDependenciesChecker.java index b566528f6bd8d..dd597c3788757 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateDependenciesChecker.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateDependenciesChecker.java @@ -71,6 +71,7 @@ import com.facebook.presto.sql.planner.plan.TableWriterMergeNode; import com.facebook.presto.sql.planner.plan.TopNRowNumberNode; import com.facebook.presto.sql.planner.plan.UnnestNode; +import com.facebook.presto.sql.planner.plan.UpdateNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -617,6 +618,16 @@ public Void visitDelete(DeleteNode node, Set boundV return null; } + @Override + public Void visitUpdate(UpdateNode node, Set boundVariables) + { + PlanNode source = node.getSource(); + source.accept(this, boundVariables); // visit child + checkArgument(source.getOutputVariables().contains(node.getRowId()), "Invalid node. Row ID symbol (%s) is not in source plan output (%s)", node.getRowId(), node.getSource().getOutputVariables()); + checkArgument(source.getOutputVariables().containsAll(node.getColumnValueAndRowIdSymbols()), "Invalid node. Some UPDATE SET expression symbols (%s) are not contained in the outputSymbols (%s)", node.getColumnValueAndRowIdSymbols(), source.getOutputVariables()); + + return null; + } @Override public Void visitMetadataDelete(MetadataDeleteNode node, Set boundVariables) diff --git a/presto-main/src/main/java/com/facebook/presto/util/GraphvizPrinter.java b/presto-main/src/main/java/com/facebook/presto/util/GraphvizPrinter.java index f1d880ebf1804..9edd782e049b7 100644 --- a/presto-main/src/main/java/com/facebook/presto/util/GraphvizPrinter.java +++ b/presto-main/src/main/java/com/facebook/presto/util/GraphvizPrinter.java @@ -68,6 +68,7 @@ import com.facebook.presto.sql.planner.plan.TableWriterMergeNode; import com.facebook.presto.sql.planner.plan.TopNRowNumberNode; import com.facebook.presto.sql.planner.plan.UnnestNode; +import com.facebook.presto.sql.planner.plan.UpdateNode; import com.facebook.presto.sql.planner.planPrinter.RowExpressionFormatter; import com.facebook.presto.sql.tree.ComparisonExpression; import com.facebook.presto.sql.tree.Expression; @@ -129,6 +130,7 @@ private enum NodeType UNNEST, ANALYZE_FINISH, EXPLAIN_ANALYZE, + UPDATE, } private static final Map NODE_COLORS = immutableEnumMap(ImmutableMap.builder() @@ -159,6 +161,7 @@ private enum NodeType .put(NodeType.SAMPLE, "goldenrod4") .put(NodeType.ANALYZE_FINISH, "plum") .put(NodeType.EXPLAIN_ANALYZE, "cadetblue1") + .put(NodeType.UPDATE, "blue") .build()); static { @@ -311,6 +314,13 @@ public Void visitTableWriteMerge(TableWriterMergeNode node, Void context) return node.getSource().accept(this, context); } + @Override + public Void visitUpdate(UpdateNode node, Void context) + { + printNode(node, format("UpdateNode[%s]", Joiner.on(", ").join(node.getOutputVariables())), NODE_COLORS.get(NodeType.UPDATE)); + return node.getSource().accept(this, context); + } + @Override public Void visitStatisticsWriterNode(StatisticsWriterNode node, Void context) {