Skip to content

Commit

Permalink
Fixed Iceberg Delete as proof of concept. Fixed missing serialization…
Browse files Browse the repository at this point in the history
… changes.
  • Loading branch information
shelton408 committed Feb 11, 2025
1 parent b450564 commit 281756b
Show file tree
Hide file tree
Showing 10 changed files with 214 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.facebook.presto.hive.statistics.HiveStatisticsProvider;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorMetadataUpdateHandle;
import com.facebook.presto.spi.ConnectorNewTableLayout;
Expand Down Expand Up @@ -2517,7 +2518,7 @@ public Optional<List<SchemaTableName>> getReferencedMaterializedViews(ConnectorS
}

@Override
public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
public ConnectorDeleteTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
{
throw new PrestoException(NOT_SUPPORTED, "This connector only supports delete where one or more partitions are deleted entirely");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.facebook.presto.iceberg.statistics.StatisticsFileCache;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorNewTableLayout;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
Expand Down Expand Up @@ -856,12 +857,22 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa
}

@Override
public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
public ConnectorDeleteTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());

if (handle.isSnapshotSpecified()) {
IcebergDeleteTableHandle deleteHandle = new IcebergDeleteTableHandle(
handle.getSchemaName(),
handle.getIcebergTableName(),
handle.isSnapshotSpecified(),
handle.getOutputPath(),
handle.getStorageProperties(),
handle.getTableSchemaJson(),
handle.getPartitionSpecId(),
handle.getEqualityFieldIds()
);
Table icebergTable = getIcebergTable(session, deleteHandle.getSchemaTableName());

if (deleteHandle.isSnapshotSpecified()) {
throw new PrestoException(NOT_SUPPORTED, "This connector do not allow delete data at specified snapshot");
}

Expand All @@ -877,13 +888,13 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable
validateTableMode(session, icebergTable);
transaction = icebergTable.newTransaction();

return handle;
return deleteHandle;
}

@Override
public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHandle, Collection<Slice> fragments)
public void finishDelete(ConnectorSession session, ConnectorDeleteTableHandle tableHandle, Collection<Slice> fragments)
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
IcebergDeleteTableHandle handle = (IcebergDeleteTableHandle) tableHandle;
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());

RowDelta rowDelta = transaction.newRowDelta();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;

import com.facebook.presto.hive.BaseHiveTableHandle;
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.SchemaTableName;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

import static java.util.Objects.requireNonNull;

public class IcebergDeleteTableHandle
implements ConnectorDeleteTableHandle
{
private final String schemaName;
private final String tableName;
private final IcebergTableName icebergTableName;
private final boolean snapshotSpecified;
private final Optional<String> outputPath;
private final Optional<Map<String, String>> storageProperties;
private final Optional<String> tableSchemaJson;
private final Optional<Set<Integer>> partitionFieldIds;
private final Optional<Set<Integer>> equalityFieldIds;

@JsonCreator
public IcebergDeleteTableHandle(
@JsonProperty("schemaName") String schemaName,
@JsonProperty("icebergTableName") IcebergTableName icebergTableName,
@JsonProperty("snapshotSpecified") boolean snapshotSpecified,
@JsonProperty("outputPath") Optional<String> outputPath,
@JsonProperty("storageProperties") Optional<Map<String, String>> storageProperties,
@JsonProperty("tableSchemaJson") Optional<String> tableSchemaJson,
@JsonProperty("partitionFieldIds") Optional<Set<Integer>> partitionFieldIds,
@JsonProperty("equalityFieldIds") Optional<Set<Integer>> equalityFieldIds)
{
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(icebergTableName.getTableName(), "tableName is null");
this.icebergTableName = requireNonNull(icebergTableName, "tableName is null");
this.snapshotSpecified = snapshotSpecified;
this.outputPath = requireNonNull(outputPath, "filePrefix is null");
this.storageProperties = requireNonNull(storageProperties, "storageProperties is null");
this.tableSchemaJson = requireNonNull(tableSchemaJson, "tableSchemaJson is null");
this.partitionFieldIds = requireNonNull(partitionFieldIds, "partitionFieldIds is null");
this.equalityFieldIds = requireNonNull(equalityFieldIds, "equalityFieldIds is null");
}

@JsonProperty
public IcebergTableName getIcebergTableName()
{
return icebergTableName;
}

@JsonProperty
public boolean isSnapshotSpecified()
{
return snapshotSpecified;
}

@JsonProperty
public Optional<String> getTableSchemaJson()
{
return tableSchemaJson;
}

@JsonProperty
public Optional<String> getOutputPath()
{
return outputPath;
}

@JsonProperty
public Optional<Map<String, String>> getStorageProperties()
{
return storageProperties;
}

@JsonProperty
public Optional<Set<Integer>> getPartitionSpecId()
{
return partitionFieldIds;
}

@JsonProperty
public Optional<Set<Integer>> getEqualityFieldIds()
{
return equalityFieldIds;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

IcebergDeleteTableHandle that = (IcebergDeleteTableHandle) o;
return Objects.equals(getSchemaName(), that.getSchemaName()) &&
Objects.equals(icebergTableName, that.icebergTableName) &&
snapshotSpecified == that.snapshotSpecified &&
Objects.equals(tableSchemaJson, that.tableSchemaJson) &&
Objects.equals(equalityFieldIds, that.equalityFieldIds);
}

@Override
public int hashCode()
{
return Objects.hash(getSchemaName(), icebergTableName, snapshotSpecified, tableSchemaJson, equalityFieldIds);
}

@Override
public String toString()
{
return icebergTableName.toString();
}

@JsonProperty
public String getSchemaName()
{
return schemaName;
}

@JsonProperty
public String getTableName()
{
return tableName;
}

public SchemaTableName getSchemaTableName()
{
return new SchemaTableName(schemaName, tableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.hive.HiveTransactionHandle;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
Expand Down Expand Up @@ -62,6 +63,12 @@ public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass()
return IcebergInsertTableHandle.class;
}

@Override
public Class<? extends ConnectorDeleteTableHandle> getDeleteTableHandleClass()
{
return IcebergDeleteTableHandle.class;
}

@Override
public Class<? extends ConnectorTransactionHandle> getTransactionHandleClass()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.metadata;

import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorInsertTableHandle;

import javax.inject.Inject;

public class DeleteTableHandleJacksonModule
extends AbstractTypedJacksonModule<ConnectorDeleteTableHandle>
{
@Inject
public DeleteTableHandleJacksonModule(HandleResolver handleResolver)
{
super(ConnectorDeleteTableHandle.class,
handleResolver::getId,
handleResolver::getDeleteTableHandleClass);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public void configure(Binder binder)
jsonBinder(binder).addModuleBinding().to(SplitJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(OutputTableHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(InsertTableHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(DeleteTableHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(IndexHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(TransactionHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(PartitioningHandleJacksonModule.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ public Optional<ConnectorOutputMetadata> finishInsert(Session session, InsertTab
}

@Override
public ColumnHandle getDeleteRowIdColumnHandle(Session session, DeleteTableHandle tableHandle)
public ColumnHandle getDeleteRowIdColumnHandle(Session session, TableHandle tableHandle)
{
ConnectorId connectorId = tableHandle.getConnectorId();
ConnectorMetadata metadata = getMetadata(session, connectorId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ default Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
* These IDs will be passed to the {@code deleteRows()} method of the
* {@link com.facebook.presto.spi.UpdatablePageSource} that created them.
*/
default ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorDeleteTableHandle tableHandle)
default ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
{
throw new PrestoException(NOT_SUPPORTED, "This connector does not support deletes");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ public void renameView(ConnectorSession session, SchemaTableName viewName, Schem
}

@Override
public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorDeleteTableHandle tableHandle)
public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.getDeleteRowIdColumnHandle(session, tableHandle);
Expand Down

0 comments on commit 281756b

Please sign in to comment.