From 281756b4686ec973f2592ec34e98732c6a4acfc3 Mon Sep 17 00:00:00 2001 From: Shelton Cai Date: Tue, 11 Feb 2025 13:32:18 -0800 Subject: [PATCH] Fixed Iceberg Delete as proof of concept. Fixed missing serialization changes. --- .../facebook/presto/hive/HiveMetadata.java | 3 +- .../iceberg/IcebergAbstractMetadata.java | 25 ++- .../iceberg/IcebergDeleteTableHandle.java | 152 ++++++++++++++++++ .../presto/iceberg/IcebergHandleResolver.java | 7 + .../presto/metadata/DeleteTableHandle.java | 1 - .../DeleteTableHandleJacksonModule.java | 31 ++++ .../presto/metadata/HandleJsonModule.java | 1 + .../presto/metadata/MetadataManager.java | 2 +- .../spi/connector/ConnectorMetadata.java | 2 +- .../ClassLoaderSafeConnectorMetadata.java | 2 +- 10 files changed, 214 insertions(+), 12 deletions(-) create mode 100644 presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDeleteTableHandle.java create mode 100644 presto-main/src/main/java/com/facebook/presto/metadata/DeleteTableHandleJacksonModule.java diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java index 30b6bd7577d56..d7d56ec073aaf 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java @@ -43,6 +43,7 @@ import com.facebook.presto.hive.statistics.HiveStatisticsProvider; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ConnectorDeleteTableHandle; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorMetadataUpdateHandle; import com.facebook.presto.spi.ConnectorNewTableLayout; @@ -2517,7 +2518,7 @@ public Optional> getReferencedMaterializedViews(ConnectorS } @Override - public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle) + public ConnectorDeleteTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle) { throw new PrestoException(NOT_SUPPORTED, "This connector only supports delete where one or more partitions are deleted entirely"); } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java index 457896af447dd..c5f11126305c8 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java @@ -30,6 +30,7 @@ import com.facebook.presto.iceberg.statistics.StatisticsFileCache; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ConnectorDeleteTableHandle; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorNewTableLayout; import com.facebook.presto.spi.ConnectorOutputTableHandle; @@ -856,12 +857,22 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa } @Override - public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle) + public ConnectorDeleteTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle) { IcebergTableHandle handle = (IcebergTableHandle) tableHandle; - Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); - - if (handle.isSnapshotSpecified()) { + IcebergDeleteTableHandle deleteHandle = new IcebergDeleteTableHandle( + handle.getSchemaName(), + handle.getIcebergTableName(), + handle.isSnapshotSpecified(), + handle.getOutputPath(), + handle.getStorageProperties(), + handle.getTableSchemaJson(), + handle.getPartitionSpecId(), + handle.getEqualityFieldIds() + ); + Table icebergTable = getIcebergTable(session, deleteHandle.getSchemaTableName()); + + if (deleteHandle.isSnapshotSpecified()) { throw new PrestoException(NOT_SUPPORTED, "This connector do not allow delete data at specified snapshot"); } @@ -877,13 +888,13 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable validateTableMode(session, icebergTable); transaction = icebergTable.newTransaction(); - return handle; + return deleteHandle; } @Override - public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHandle, Collection fragments) + public void finishDelete(ConnectorSession session, ConnectorDeleteTableHandle tableHandle, Collection fragments) { - IcebergTableHandle handle = (IcebergTableHandle) tableHandle; + IcebergDeleteTableHandle handle = (IcebergDeleteTableHandle) tableHandle; Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); RowDelta rowDelta = transaction.newRowDelta(); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDeleteTableHandle.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDeleteTableHandle.java new file mode 100644 index 0000000000000..dcf2d595787f4 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergDeleteTableHandle.java @@ -0,0 +1,152 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.presto.hive.BaseHiveTableHandle; +import com.facebook.presto.spi.ConnectorDeleteTableHandle; +import com.facebook.presto.spi.SchemaTableName; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import static java.util.Objects.requireNonNull; + +public class IcebergDeleteTableHandle + implements ConnectorDeleteTableHandle +{ + private final String schemaName; + private final String tableName; + private final IcebergTableName icebergTableName; + private final boolean snapshotSpecified; + private final Optional outputPath; + private final Optional> storageProperties; + private final Optional tableSchemaJson; + private final Optional> partitionFieldIds; + private final Optional> equalityFieldIds; + + @JsonCreator + public IcebergDeleteTableHandle( + @JsonProperty("schemaName") String schemaName, + @JsonProperty("icebergTableName") IcebergTableName icebergTableName, + @JsonProperty("snapshotSpecified") boolean snapshotSpecified, + @JsonProperty("outputPath") Optional outputPath, + @JsonProperty("storageProperties") Optional> storageProperties, + @JsonProperty("tableSchemaJson") Optional tableSchemaJson, + @JsonProperty("partitionFieldIds") Optional> partitionFieldIds, + @JsonProperty("equalityFieldIds") Optional> equalityFieldIds) + { + this.schemaName = requireNonNull(schemaName, "schemaName is null"); + this.tableName = requireNonNull(icebergTableName.getTableName(), "tableName is null"); + this.icebergTableName = requireNonNull(icebergTableName, "tableName is null"); + this.snapshotSpecified = snapshotSpecified; + this.outputPath = requireNonNull(outputPath, "filePrefix is null"); + this.storageProperties = requireNonNull(storageProperties, "storageProperties is null"); + this.tableSchemaJson = requireNonNull(tableSchemaJson, "tableSchemaJson is null"); + this.partitionFieldIds = requireNonNull(partitionFieldIds, "partitionFieldIds is null"); + this.equalityFieldIds = requireNonNull(equalityFieldIds, "equalityFieldIds is null"); + } + + @JsonProperty + public IcebergTableName getIcebergTableName() + { + return icebergTableName; + } + + @JsonProperty + public boolean isSnapshotSpecified() + { + return snapshotSpecified; + } + + @JsonProperty + public Optional getTableSchemaJson() + { + return tableSchemaJson; + } + + @JsonProperty + public Optional getOutputPath() + { + return outputPath; + } + + @JsonProperty + public Optional> getStorageProperties() + { + return storageProperties; + } + + @JsonProperty + public Optional> getPartitionSpecId() + { + return partitionFieldIds; + } + + @JsonProperty + public Optional> getEqualityFieldIds() + { + return equalityFieldIds; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + IcebergDeleteTableHandle that = (IcebergDeleteTableHandle) o; + return Objects.equals(getSchemaName(), that.getSchemaName()) && + Objects.equals(icebergTableName, that.icebergTableName) && + snapshotSpecified == that.snapshotSpecified && + Objects.equals(tableSchemaJson, that.tableSchemaJson) && + Objects.equals(equalityFieldIds, that.equalityFieldIds); + } + + @Override + public int hashCode() + { + return Objects.hash(getSchemaName(), icebergTableName, snapshotSpecified, tableSchemaJson, equalityFieldIds); + } + + @Override + public String toString() + { + return icebergTableName.toString(); + } + + @JsonProperty + public String getSchemaName() + { + return schemaName; + } + + @JsonProperty + public String getTableName() + { + return tableName; + } + + public SchemaTableName getSchemaTableName() + { + return new SchemaTableName(schemaName, tableName); + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHandleResolver.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHandleResolver.java index ca5e9fecdef3c..a9b5ce11cf21c 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHandleResolver.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHandleResolver.java @@ -15,6 +15,7 @@ import com.facebook.presto.hive.HiveTransactionHandle; import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorDeleteTableHandle; import com.facebook.presto.spi.ConnectorHandleResolver; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorOutputTableHandle; @@ -62,6 +63,12 @@ public Class getInsertTableHandleClass() return IcebergInsertTableHandle.class; } + @Override + public Class getDeleteTableHandleClass() + { + return IcebergDeleteTableHandle.class; + } + @Override public Class getTransactionHandleClass() { diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/DeleteTableHandle.java b/presto-main/src/main/java/com/facebook/presto/metadata/DeleteTableHandle.java index 04f50d95b47bb..ea05b223843d5 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/DeleteTableHandle.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/DeleteTableHandle.java @@ -15,7 +15,6 @@ import com.facebook.presto.spi.ConnectorDeleteTableHandle; import com.facebook.presto.spi.ConnectorId; -import com.facebook.presto.spi.ConnectorDeleteTableHandle; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/DeleteTableHandleJacksonModule.java b/presto-main/src/main/java/com/facebook/presto/metadata/DeleteTableHandleJacksonModule.java new file mode 100644 index 0000000000000..ad9d2bd62a79f --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/metadata/DeleteTableHandleJacksonModule.java @@ -0,0 +1,31 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.metadata; + +import com.facebook.presto.spi.ConnectorDeleteTableHandle; +import com.facebook.presto.spi.ConnectorInsertTableHandle; + +import javax.inject.Inject; + +public class DeleteTableHandleJacksonModule + extends AbstractTypedJacksonModule +{ + @Inject + public DeleteTableHandleJacksonModule(HandleResolver handleResolver) + { + super(ConnectorDeleteTableHandle.class, + handleResolver::getId, + handleResolver::getDeleteTableHandleClass); + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/HandleJsonModule.java b/presto-main/src/main/java/com/facebook/presto/metadata/HandleJsonModule.java index 0c07b99aaab4e..61eae56f41895 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/HandleJsonModule.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/HandleJsonModule.java @@ -32,6 +32,7 @@ public void configure(Binder binder) jsonBinder(binder).addModuleBinding().to(SplitJacksonModule.class); jsonBinder(binder).addModuleBinding().to(OutputTableHandleJacksonModule.class); jsonBinder(binder).addModuleBinding().to(InsertTableHandleJacksonModule.class); + jsonBinder(binder).addModuleBinding().to(DeleteTableHandleJacksonModule.class); jsonBinder(binder).addModuleBinding().to(IndexHandleJacksonModule.class); jsonBinder(binder).addModuleBinding().to(TransactionHandleJacksonModule.class); jsonBinder(binder).addModuleBinding().to(PartitioningHandleJacksonModule.class); diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java b/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java index a4d62ba9adadf..222ceac60ac40 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java @@ -861,7 +861,7 @@ public Optional finishInsert(Session session, InsertTab } @Override - public ColumnHandle getDeleteRowIdColumnHandle(Session session, DeleteTableHandle tableHandle) + public ColumnHandle getDeleteRowIdColumnHandle(Session session, TableHandle tableHandle) { ConnectorId connectorId = tableHandle.getConnectorId(); ConnectorMetadata metadata = getMetadata(session, connectorId); diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorMetadata.java b/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorMetadata.java index 95e1950c323cc..73031bb83e11e 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorMetadata.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorMetadata.java @@ -516,7 +516,7 @@ default Optional finishInsert(ConnectorSession session, * These IDs will be passed to the {@code deleteRows()} method of the * {@link com.facebook.presto.spi.UpdatablePageSource} that created them. */ - default ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorDeleteTableHandle tableHandle) + default ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle) { throw new PrestoException(NOT_SUPPORTED, "This connector does not support deletes"); } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java b/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java index 34b4dc3ce5a5e..09f315f81535e 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java @@ -484,7 +484,7 @@ public void renameView(ConnectorSession session, SchemaTableName viewName, Schem } @Override - public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorDeleteTableHandle tableHandle) + public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle) { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { return delegate.getDeleteRowIdColumnHandle(session, tableHandle);