Skip to content

Commit

Permalink
Add support for row-level deletes
Browse files Browse the repository at this point in the history
  • Loading branch information
s-akhtar-baig authored and tdcmeehan committed Feb 21, 2024
1 parent 91b0bb3 commit 14620ca
Show file tree
Hide file tree
Showing 22 changed files with 778 additions and 61 deletions.
32 changes: 18 additions & 14 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,11 @@ connector using a WITH clause:

The following table properties are available, which are specific to the Presto Iceberg connector:

========================================= ===============================================================
Property Name Description
========================================= ===============================================================
``format`` Optionally specifies the format of table data files,
either ``PARQUET`` or ``ORC``. Defaults to ``PARQUET``.
======================================= =============================================================== ============
Property Name Description Default
======================================= =============================================================== ============
``format`` Optionally specifies the format of table data files, ``PARQUET``
either ``PARQUET`` or ``ORC``.

``partitioning`` Optionally specifies table partitioning. If a table
is partitioned by columns ``c1`` and ``c2``, the partitioning
Expand All @@ -251,14 +251,16 @@ Property Name Description
``location`` Optionally specifies the file system location URI for
the table.

``format_version`` Optionally specifies the format version of the Iceberg
``format_version`` Optionally specifies the format version of the Iceberg ``2``
specification to use for new tables, either ``1`` or ``2``.
Defaults to ``1``.

``commit_retries`` Determines the number of attempts for committing the metadata
in case of concurrent upsert requests, before failing. The
default value is 4.
========================================= ===============================================================
``commit_retries`` Determines the number of attempts for committing the metadata ``4``
in case of concurrent upsert requests, before failing.

``delete_mode`` Optionally specifies the write delete mode of the Iceberg ``merge-on-read``
specification to use for new tables, either ``copy-on-write``
or ``merge-on-read``.
======================================= =============================================================== ============

The table definition below specifies format ``ORC``, partitioning by columns ``c1`` and ``c2``,
and a file system location of ``s3://test_bucket/test_schema/test_table``:
Expand Down Expand Up @@ -791,7 +793,7 @@ dropping the table from the metadata catalog using ``TRUNCATE TABLE``.
DELETE
^^^^^^^^

The Iceberg connector can delete data in one or more entire partitions from tables by using ``DELETE FROM``. For example, to delete from the table ``lineitem``::
The Iceberg connector can delete data from tables by using ``DELETE FROM``. For example, to delete from the table ``lineitem``::

DELETE FROM lineitem;

Expand All @@ -801,12 +803,14 @@ The Iceberg connector can delete data in one or more entire partitions from tabl

.. note::

Columns in the filter must all be identity transformed partition columns of the target table.

Filtered columns only support comparison operators, such as EQUALS, LESS THAN, or LESS THAN EQUALS.

Deletes must only occur on the latest snapshot.

For V1 tables, the Iceberg connector can only delete data in one or more entire
partitions. Columns in the filter must all be identity transformed partition
columns of the target table.

DROP TABLE
^^^^^^^^^^^

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,28 @@ public class CommitTaskData
private final String path;
private final long fileSizeInBytes;
private final MetricsWrapper metrics;
private final int partitionSpecId;
private final Optional<String> partitionDataJson;
private final FileFormat fileFormat;
private final Optional<String> referencedDataFile;

@JsonCreator
public CommitTaskData(
@JsonProperty("path") String path,
@JsonProperty("fileSizeInBytes") long fileSizeInBytes,
@JsonProperty("metrics") MetricsWrapper metrics,
@JsonProperty("partitionDataJson") Optional<String> partitionDataJson)
@JsonProperty("partitionSpecJson") int partitionSpecId,
@JsonProperty("partitionDataJson") Optional<String> partitionDataJson,
@JsonProperty("fileFormat") FileFormat fileFormat,
@JsonProperty("referencedDataFile") String referencedDataFile)
{
this.path = requireNonNull(path, "path is null");
this.fileSizeInBytes = fileSizeInBytes;
this.metrics = requireNonNull(metrics, "metrics is null");
this.partitionSpecId = partitionSpecId;
this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null");
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.referencedDataFile = Optional.ofNullable(referencedDataFile);
}

@JsonProperty
Expand All @@ -58,9 +67,27 @@ public MetricsWrapper getMetrics()
return metrics;
}

@JsonProperty
public int getPartitionSpecId()
{
return partitionSpecId;
}

@JsonProperty
public Optional<String> getPartitionDataJson()
{
return partitionDataJson;
}

@JsonProperty
public FileFormat getFileFormat()
{
return fileFormat;
}

@JsonProperty
public Optional<String> getReferencedDataFile()
{
return referencedDataFile;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,13 @@
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Table;
Expand All @@ -82,6 +85,7 @@
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.CharSequenceSet;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -98,7 +102,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
import static com.facebook.presto.hive.MetadataUtils.createPredicate;
import static com.facebook.presto.hive.MetadataUtils.getCombinedRemainingPredicate;
Expand All @@ -110,25 +113,29 @@
import static com.facebook.presto.iceberg.IcebergColumnHandle.DATA_SEQUENCE_NUMBER_COLUMN_METADATA;
import static com.facebook.presto.iceberg.IcebergColumnHandle.PATH_COLUMN_HANDLE;
import static com.facebook.presto.iceberg.IcebergColumnHandle.PATH_COLUMN_METADATA;
import static com.facebook.presto.iceberg.IcebergColumnHandle.primitiveIcebergColumnHandle;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.DATA_SEQUENCE_NUMBER;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.FILE_PATH;
import static com.facebook.presto.iceberg.IcebergSessionProperties.isPushdownFilterEnabled;
import static com.facebook.presto.iceberg.IcebergTableProperties.DELETE_MODE;
import static com.facebook.presto.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static com.facebook.presto.iceberg.IcebergTableProperties.FORMAT_VERSION;
import static com.facebook.presto.iceberg.IcebergTableProperties.LOCATION_PROPERTY;
import static com.facebook.presto.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
import static com.facebook.presto.iceberg.IcebergTableType.CHANGELOG;
import static com.facebook.presto.iceberg.IcebergTableType.DATA;
import static com.facebook.presto.iceberg.IcebergTableType.EQUALITY_DELETES;
import static com.facebook.presto.iceberg.IcebergUtil.MIN_FORMAT_VERSION_FOR_DELETE;
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
import static com.facebook.presto.iceberg.IcebergUtil.getDeleteMode;
import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeyColumnHandles;
import static com.facebook.presto.iceberg.IcebergUtil.getSnapshotIdAsOfTime;
import static com.facebook.presto.iceberg.IcebergUtil.getTableComment;
import static com.facebook.presto.iceberg.IcebergUtil.resolveSnapshotIdByName;
import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns;
import static com.facebook.presto.iceberg.IcebergUtil.tryGetLocation;
import static com.facebook.presto.iceberg.IcebergUtil.tryGetProperties;
import static com.facebook.presto.iceberg.IcebergUtil.tryGetSchema;
import static com.facebook.presto.iceberg.IcebergUtil.validateTableMode;
import static com.facebook.presto.iceberg.PartitionFields.getPartitionColumnName;
Expand All @@ -148,6 +155,7 @@
import static java.lang.String.format;
import static java.util.Collections.singletonList;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.MetadataColumns.ROW_POSITION;

public abstract class IcebergAbstractMetadata
implements ConnectorMetadata
Expand Down Expand Up @@ -473,7 +481,7 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
@Override
public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
{
return primitiveIcebergColumnHandle(0, "$row_id", BIGINT, Optional.empty());
return IcebergColumnHandle.create(ROW_POSITION, typeManager, IcebergColumnHandle.ColumnType.REGULAR);
}

@Override
Expand Down Expand Up @@ -509,6 +517,9 @@ protected ImmutableMap<String, Object> createMetadataProperties(Table icebergTab
if (!icebergTable.location().isEmpty()) {
properties.put(LOCATION_PROPERTY, icebergTable.location());
}

properties.put(DELETE_MODE, IcebergUtil.getDeleteMode(icebergTable));

return properties.build();
}

Expand Down Expand Up @@ -699,7 +710,8 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa
})
.orElseGet(() -> resolveSnapshotIdByName(table, name));

// Get Iceberg tables schema with missing filesystem metadata will fail.
// Get Iceberg tables schema, properties, and location with missing
// filesystem metadata will fail.
// See https://github.com/prestodb/presto/pull/21181
Optional<Schema> tableSchema = tryGetSchema(table);
Optional<String> tableSchemaJson = tableSchema.map(SchemaParser::toJson);
Expand All @@ -709,6 +721,8 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa
new IcebergTableName(name.getTableName(), name.getTableType(), tableSnapshotId, name.getChangelogEndSnapshot()),
name.getSnapshotId().isPresent(),
TupleDomain.all(),
tryGetLocation(table),
tryGetProperties(table),
tableSchemaJson,
Optional.empty(),
Optional.empty());
Expand Down Expand Up @@ -752,10 +766,71 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa
public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());

if (handle.isSnapshotSpecified()) {
throw new PrestoException(NOT_SUPPORTED, "This connector do not allow delete data at specified snapshot");
}
throw new PrestoException(NOT_SUPPORTED, "This connector only supports delete where one or more partitions are deleted entirely");

int formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion();
if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE) {
throw new PrestoException(NOT_SUPPORTED, format("This connector only supports delete where one or more partitions are deleted entirely for table versions older than %d", MIN_FORMAT_VERSION_FOR_DELETE));
}

if (getDeleteMode(icebergTable) == RowLevelOperationMode.COPY_ON_WRITE) {
throw new PrestoException(NOT_SUPPORTED, "This connector only supports delete where one or more partitions are deleted entirely. Configure delete_mode table property to allow row level deletions.");
}

validateTableMode(session, icebergTable);
transaction = icebergTable.newTransaction();

return handle;
}

@Override
public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHandle, Collection<Slice> fragments)
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());

RowDelta rowDelta = transaction.newRowDelta();

List<CommitTaskData> commitTasks = fragments.stream()
.map(slice -> commitTaskCodec.fromJson(slice.getBytes()))
.collect(toImmutableList());

CharSequenceSet referencedDataFiles = CharSequenceSet.empty();

for (CommitTaskData task : commitTasks) {
PartitionSpec spec = icebergTable.specs().get(task.getPartitionSpecId());
FileMetadata.Builder builder = FileMetadata.deleteFileBuilder(spec)
.ofPositionDeletes()
.withPath(task.getPath())
.withFileSizeInBytes(task.getFileSizeInBytes())
.withFormat(FileFormat.fromString(task.getFileFormat().name()))
.withMetrics(task.getMetrics().metrics());

if (!spec.fields().isEmpty()) {
String partitionDataJson = task.getPartitionDataJson()
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
Type[] partitionColumnTypes = spec.fields().stream()
.map(field -> field.transform().getResultType(
spec.schema().findType(field.sourceId())))
.toArray(Type[]::new);
builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
}
rowDelta.addDeletes(builder.build());
if (task.getReferencedDataFile().isPresent()) {
referencedDataFiles.add(task.getReferencedDataFile().get());
}
}

if (!referencedDataFiles.isEmpty()) {
rowDelta.validateDataFilesExist(referencedDataFiles);
}

rowDelta.commit();
transaction.commitTransaction();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ public enum IcebergErrorCode
ICEBERG_FILESYSTEM_ERROR(8, EXTERNAL),
ICEBERG_WRITE_VALIDATION_FAILED(10, INTERNAL_ERROR),
ICEBERG_INVALID_SNAPSHOT_ID(11, USER_ERROR),
ICEBERG_INVALID_TABLE_TIMESTAMP(12, USER_ERROR);
ICEBERG_INVALID_TABLE_TIMESTAMP(12, USER_ERROR),
ICEBERG_ROLLBACK_ERROR(13, EXTERNAL),
ICEBERG_INVALID_FORMAT_VERSION(14, USER_ERROR);

private final ErrorCode errorCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,10 @@ public CompletableFuture<Collection<Slice>> finish()
context.getPath().toString(),
context.writer.getFileSizeInBytes(),
new MetricsWrapper(context.writer.getMetrics()),
context.getPartitionData().map(PartitionData::toJson));
partitionSpec.specId(),
context.getPartitionData().map(PartitionData::toJson),
fileFormat,
null);

commitTasks.add(wrappedBuffer(jsonCodec.toJsonBytes(task)));
}
Expand Down
Loading

0 comments on commit 14620ca

Please sign in to comment.