diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java index 7c829626ddc9..73024812052f 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java @@ -44,6 +44,7 @@ public class DeltaLakeColumnHandle { private static final int INSTANCE_SIZE = instanceSize(DeltaLakeColumnHandle.class); + public static final String ROW_POSITION_COLUMN_NAME = "$row_position"; public static final String ROW_ID_COLUMN_NAME = "$row_id"; public static final Type MERGE_ROW_ID_TYPE = rowType( @@ -218,6 +219,11 @@ public HiveColumnHandle toHiveColumnHandle() Optional.empty()); } + public static DeltaLakeColumnHandle rowPositionColumnHandle() + { + return new DeltaLakeColumnHandle(ROW_POSITION_COLUMN_NAME, BIGINT, OptionalInt.empty(), ROW_POSITION_COLUMN_NAME, BIGINT, SYNTHESIZED, Optional.empty()); + } + public static DeltaLakeColumnHandle pathColumnHandle() { return new DeltaLakeColumnHandle(PATH_COLUMN_NAME, PATH_TYPE, OptionalInt.empty(), PATH_COLUMN_NAME, PATH_TYPE, SYNTHESIZED, Optional.empty()); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index dc97f035a1cd..29688a77201a 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -1654,7 +1654,8 @@ private static void appendAddFileEntries(TransactionLogWriter transactionLogWrit dataChange, Optional.of(serializeStatsAsJson(statisticsWithExactNames)), Optional.empty(), - ImmutableMap.of())); + ImmutableMap.of(), + Optional.empty())); } } @@ -3084,7 +3085,8 @@ private AddFileEntry prepareUpdatedAddFileEntry(ComputedStatistics stats, AddFil false, Optional.of(serializeStatsAsJson(deltaLakeJsonFileStatistics)), Optional.empty(), - addFileEntry.getTags()); + addFileEntry.getTags(), + addFileEntry.getDeletionVector()); } catch (JsonProcessingException e) { throw new TrinoException(GENERIC_INTERNAL_ERROR, "Statistics serialization error", e); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSource.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSource.java index 59f504fdf287..463d7bde5c5d 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSource.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSource.java @@ -15,6 +15,7 @@ import io.airlift.json.JsonCodec; import io.airlift.json.JsonCodecFactory; +import io.trino.plugin.deltalake.delete.PageFilter; import io.trino.plugin.hive.ReaderProjectionsAdapter; import io.trino.spi.Page; import io.trino.spi.TrinoException; @@ -33,6 +34,7 @@ import java.util.Optional; import java.util.OptionalLong; import java.util.Set; +import java.util.function.Supplier; import static com.google.common.base.Throwables.throwIfInstanceOf; import static io.airlift.slice.Slices.utf8Slice; @@ -63,6 +65,7 @@ public class DeltaLakePageSource private final Block partitionsBlock; private final ConnectorPageSource delegate; private final Optional projectionsAdapter; + private final Supplier> deletePredicate; public DeltaLakePageSource( List columns, @@ -73,7 +76,8 @@ public DeltaLakePageSource( Optional projectionsAdapter, String path, long fileSize, - long fileModifiedTime) + long fileModifiedTime, + Supplier> deletePredicate) { int size = columns.size(); requireNonNull(partitionKeys, "partitionKeys is null"); @@ -131,6 +135,7 @@ else if (missingColumnNames.contains(column.getBaseColumnName())) { this.rowIdIndex = rowIdIndex; this.pathBlock = pathBlock; this.partitionsBlock = partitionsBlock; + this.deletePredicate = requireNonNull(deletePredicate, "deletePredicate is null"); } @Override @@ -168,6 +173,11 @@ public Page getNextPage() if (projectionsAdapter.isPresent()) { dataPage = projectionsAdapter.get().adaptPage(dataPage); } + Optional deleteFilterPredicate = deletePredicate.get(); + if (deleteFilterPredicate.isPresent()) { + dataPage = deleteFilterPredicate.get().apply(dataPage); + } + int batchSize = dataPage.getPositionCount(); Block[] blocks = new Block[prefilledBlocks.length]; for (int i = 0; i < prefilledBlocks.length; i++) { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java index 05d8e5ccb70c..913144a0dd7e 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java @@ -13,17 +13,22 @@ */ package io.trino.plugin.deltalake; +import com.google.common.base.Suppliers; import com.google.common.collect.AbstractIterator; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.TrinoInputFile; import io.trino.parquet.ParquetDataSource; import io.trino.parquet.ParquetReaderOptions; import io.trino.parquet.reader.MetadataReader; +import io.trino.plugin.deltalake.delete.PageFilter; +import io.trino.plugin.deltalake.delete.PositionDeleteFilter; +import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry; import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode; import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.HiveColumnHandle; @@ -35,6 +40,7 @@ import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.plugin.hive.parquet.TrinoParquetDataSource; import io.trino.spi.Page; +import io.trino.spi.TrinoException; import io.trino.spi.block.Block; import io.trino.spi.block.LongArrayBlock; import io.trino.spi.connector.ColumnHandle; @@ -56,6 +62,7 @@ import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; import org.joda.time.DateTimeZone; +import org.roaringbitmap.longlong.Roaring64NavigableMap; import java.io.IOException; import java.io.UncheckedIOException; @@ -63,6 +70,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Supplier; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -71,11 +79,13 @@ import static io.airlift.slice.SizeOf.SIZE_OF_LONG; import static io.trino.plugin.deltalake.DeltaHiveTypeTranslator.toHiveType; import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.ROW_ID_COLUMN_NAME; +import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.rowPositionColumnHandle; import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; +import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetMaxReadBlockRowCount; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetMaxReadBlockSize; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetUseColumnIndex; -import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode.NONE; +import static io.trino.plugin.deltalake.delete.DeletionVectors.readDeletionVectors; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnMappingMode; import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.PARQUET_ROW_INDEX_COLUMN; @@ -172,6 +182,7 @@ public ConnectorPageSource createPageSource( if (filteredSplitPredicate.isAll() && split.getStart() == 0 && split.getLength() == split.getFileSize() && split.getFileRowCount().isPresent() && + split.getDeletionVector().isEmpty() && (regularColumns.isEmpty() || onlyRowIdColumn(regularColumns))) { return new DeltaLakePageSource( deltaLakeColumns, @@ -182,11 +193,13 @@ public ConnectorPageSource createPageSource( Optional.empty(), split.getPath(), split.getFileSize(), - split.getFileModifiedTime()); + split.getFileModifiedTime(), + Optional::empty); } Location location = Location.of(split.getPath()); - TrinoInputFile inputFile = fileSystemFactory.create(session).newInputFile(location, split.getFileSize()); + TrinoFileSystem fileSystem = fileSystemFactory.create(session); + TrinoInputFile inputFile = fileSystem.newInputFile(location, split.getFileSize()); ParquetReaderOptions options = parquetReaderOptions.withMaxReadBlockSize(getParquetMaxReadBlockSize(session)) .withMaxReadBlockRowCount(getParquetMaxReadBlockRowCount(session)) .withUseColumnIndex(isParquetUseColumnIndex(session)); @@ -204,6 +217,9 @@ public ConnectorPageSource createPageSource( hiveColumnHandles::add, () -> missingColumnNames.add(column.getBaseColumnName())); } + if (split.getDeletionVector().isPresent() && !regularColumns.contains(rowPositionColumnHandle())) { + hiveColumnHandles.add(PARQUET_ROW_INDEX_COLUMN); + } TupleDomain parquetPredicate = getParquetTupleDomain(filteredSplitPredicate.simplify(domainCompactionThreshold), columnMappingMode, parquetFieldIdToName); @@ -227,6 +243,19 @@ public ConnectorPageSource createPageSource( column -> ((HiveColumnHandle) column).getType(), HivePageSourceProvider::getProjection)); + Supplier> deletePredicate = Suppliers.memoize(() -> { + if (split.getDeletionVector().isEmpty()) { + return Optional.empty(); + } + + List requiredColumns = ImmutableList.builderWithExpectedSize(deltaLakeColumns.size() + 1) + .addAll(deltaLakeColumns) + .add(rowPositionColumnHandle()) + .build(); + PositionDeleteFilter deleteFilter = readDeletes(fileSystem, Location.of(table.location()), split.getDeletionVector().get()); + return Optional.of(deleteFilter.createPredicate(requiredColumns)); + }); + return new DeltaLakePageSource( deltaLakeColumns, missingColumnNames.build(), @@ -236,7 +265,22 @@ public ConnectorPageSource createPageSource( projectionsAdapter, split.getPath(), split.getFileSize(), - split.getFileModifiedTime()); + split.getFileModifiedTime(), + deletePredicate); + } + + private PositionDeleteFilter readDeletes( + TrinoFileSystem fileSystem, + Location tableLocation, + DeletionVectorEntry deletionVector) + { + try { + Roaring64NavigableMap deletedRows = readDeletionVectors(fileSystem, tableLocation, deletionVector); + return new PositionDeleteFilter(deletedRows); + } + catch (IOException e) { + throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Failed to read deletion vectors", e); + } } public Map loadParquetIdAndNameMapping(TrinoInputFile inputFile, ParquetReaderOptions options) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java index d5398e2a4fa1..d750bd371f7f 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.airlift.slice.SizeOf; +import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry; import io.trino.spi.HostAddress; import io.trino.spi.SplitWeight; import io.trino.spi.connector.ConnectorSplit; @@ -47,6 +48,7 @@ public class DeltaLakeSplit private final long fileSize; private final Optional fileRowCount; private final long fileModifiedTime; + private final Optional deletionVector; private final SplitWeight splitWeight; private final TupleDomain statisticsPredicate; private final Map> partitionKeys; @@ -59,6 +61,7 @@ public DeltaLakeSplit( @JsonProperty("fileSize") long fileSize, @JsonProperty("rowCount") Optional fileRowCount, @JsonProperty("fileModifiedTime") long fileModifiedTime, + @JsonProperty("deletionVector") Optional deletionVector, @JsonProperty("splitWeight") SplitWeight splitWeight, @JsonProperty("statisticsPredicate") TupleDomain statisticsPredicate, @JsonProperty("partitionKeys") Map> partitionKeys) @@ -69,6 +72,7 @@ public DeltaLakeSplit( this.fileSize = fileSize; this.fileRowCount = requireNonNull(fileRowCount, "rowCount is null"); this.fileModifiedTime = fileModifiedTime; + this.deletionVector = requireNonNull(deletionVector, "deletionVector is null"); this.splitWeight = requireNonNull(splitWeight, "splitWeight is null"); this.statisticsPredicate = requireNonNull(statisticsPredicate, "statisticsPredicate is null"); this.partitionKeys = requireNonNull(partitionKeys, "partitionKeys is null"); @@ -130,6 +134,12 @@ public long getFileModifiedTime() return fileModifiedTime; } + @JsonProperty + public Optional getDeletionVector() + { + return deletionVector; + } + /** * A TupleDomain representing the min/max statistics from the file this split was generated from. This does not contain any partitioning information. */ @@ -151,6 +161,7 @@ public long getRetainedSizeInBytes() return INSTANCE_SIZE + estimatedSizeOf(path) + sizeOf(fileRowCount, value -> LONG_INSTANCE_SIZE) + + sizeOf(deletionVector, DeletionVectorEntry::sizeInBytes) + splitWeight.getRetainedSizeInBytes() + statisticsPredicate.getRetainedSizeInBytes(DeltaLakeColumnHandle::getRetainedSizeInBytes) + estimatedSizeOf(partitionKeys, SizeOf::estimatedSizeOf, value -> sizeOf(value, SizeOf::estimatedSizeOf)); @@ -175,6 +186,7 @@ public String toString() .add("length", length) .add("fileSize", fileSize) .add("rowCount", fileRowCount) + .add("deletionVector", deletionVector) .add("statisticsPredicate", statisticsPredicate) .add("partitionKeys", partitionKeys) .toString(); @@ -195,6 +207,7 @@ public boolean equals(Object o) fileSize == that.fileSize && path.equals(that.path) && fileRowCount.equals(that.fileRowCount) && + deletionVector.equals(that.deletionVector) && Objects.equals(statisticsPredicate, that.statisticsPredicate) && Objects.equals(partitionKeys, that.partitionKeys); } @@ -202,6 +215,6 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hash(path, start, length, fileSize, fileRowCount, statisticsPredicate, partitionKeys); + return Objects.hash(path, start, length, fileSize, fileRowCount, deletionVector, statisticsPredicate, partitionKeys); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java index f2bd2e197bff..b8d16d606fdd 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java @@ -290,6 +290,7 @@ private List splitsForFile( fileSize, addFileEntry.getStats().flatMap(DeltaLakeFileStatistics::getNumRecords), addFileEntry.getModificationTime(), + addFileEntry.getDeletionVector(), SplitWeight.standard(), statisticsPredicate, partitionKeys)); @@ -314,6 +315,7 @@ private List splitsForFile( fileSize, Optional.empty(), addFileEntry.getModificationTime(), + addFileEntry.getDeletionVector(), SplitWeight.fromProportion(Math.min(Math.max((double) splitSize / maxSplitSize, minimumAssignedSplitWeight), 1.0)), statisticsPredicate, partitionKeys)); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/PageFilter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/PageFilter.java new file mode 100644 index 000000000000..7e0a7438e594 --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/PageFilter.java @@ -0,0 +1,21 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.delete; + +import io.trino.spi.Page; + +import java.util.function.Function; + +public interface PageFilter + extends Function {} diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/PositionDeleteFilter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/PositionDeleteFilter.java new file mode 100644 index 000000000000..42600b740c6f --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/PositionDeleteFilter.java @@ -0,0 +1,70 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.delete; + +import io.trino.plugin.deltalake.DeltaLakeColumnHandle; +import io.trino.spi.block.Block; +import org.roaringbitmap.longlong.Roaring64NavigableMap; + +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.ROW_POSITION_COLUMN_NAME; +import static io.trino.spi.type.BigintType.BIGINT; +import static java.util.Objects.requireNonNull; + +public final class PositionDeleteFilter +{ + private final Roaring64NavigableMap deletedRows; + + public PositionDeleteFilter(Roaring64NavigableMap deletedRows) + { + requireNonNull(deletedRows, "deletedRows is null"); + checkArgument(!deletedRows.isEmpty(), "deletedRows is empty"); + this.deletedRows = deletedRows; + } + + public PageFilter createPredicate(List columns) + { + int filePositionChannel = rowPositionChannel(columns); + + return page -> { + int positionCount = page.getPositionCount(); + int[] retained = new int[positionCount]; + int retainedCount = 0; + Block block = page.getBlock(filePositionChannel); + for (int position = 0; position < positionCount; position++) { + long filePosition = BIGINT.getLong(block, position); + if (!deletedRows.contains(filePosition)) { + retained[retainedCount] = position; + retainedCount++; + } + } + if (retainedCount == positionCount) { + return page; + } + return page.getPositions(retained, 0, retainedCount); + }; + } + + private static int rowPositionChannel(List columns) + { + for (int i = 0; i < columns.size(); i++) { + if (columns.get(i).getBaseColumnName().equals(ROW_POSITION_COLUMN_NAME)) { + return i; + } + } + throw new IllegalArgumentException("No row position column"); + } +} diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunctionProcessor.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunctionProcessor.java index e69dd5a046c0..2298f0ddb577 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunctionProcessor.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunctionProcessor.java @@ -214,6 +214,7 @@ private static DeltaLakePageSource createDeltaLakePageSource( Optional.empty(), split.path(), split.fileSize(), - 0L); + 0L, + Optional::empty); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/AddFileEntry.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/AddFileEntry.java index 7ab1950d3195..5b6c96d3a31c 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/AddFileEntry.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/AddFileEntry.java @@ -33,6 +33,7 @@ import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeStatsAsJson; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.canonicalizePartitionValues; import static java.lang.String.format; +import static java.util.Objects.requireNonNull; public class AddFileEntry { @@ -46,6 +47,7 @@ public class AddFileEntry private final long modificationTime; private final boolean dataChange; private final Map tags; + private final Optional deletionVector; private final Optional parsedStats; @JsonCreator @@ -57,7 +59,8 @@ public AddFileEntry( @JsonProperty("dataChange") boolean dataChange, @JsonProperty("stats") Optional stats, @JsonProperty("parsedStats") Optional parsedStats, - @JsonProperty("tags") @Nullable Map tags) + @JsonProperty("tags") @Nullable Map tags, + @JsonProperty("deletionVector") Optional deletionVector) { this.path = path; this.partitionValues = partitionValues; @@ -66,6 +69,7 @@ public AddFileEntry( this.modificationTime = modificationTime; this.dataChange = dataChange; this.tags = tags; + this.deletionVector = requireNonNull(deletionVector, "deletionVector is null"); Optional resultParsedStats = Optional.empty(); if (parsedStats.isPresent()) { @@ -151,6 +155,12 @@ public Map getTags() return tags; } + @JsonProperty + public Optional getDeletionVector() + { + return deletionVector; + } + @Override public String toString() { @@ -175,6 +185,7 @@ public boolean equals(Object o) Objects.equals(partitionValues, that.partitionValues) && Objects.equals(canonicalPartitionValues, that.canonicalPartitionValues) && Objects.equals(tags, that.tags) && + Objects.equals(deletionVector, that.deletionVector) && Objects.equals(parsedStats, that.parsedStats); } @@ -189,6 +200,7 @@ public int hashCode() modificationTime, dataChange, tags, + deletionVector, parsedStats); } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java index 07afa6c237aa..fe8a154ee18f 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java @@ -87,12 +87,13 @@ private DeltaLakeSchemaSupport() {} public static final String COLUMN_MAPPING_MODE_CONFIGURATION_KEY = "delta.columnMapping.mode"; public static final String COLUMN_MAPPING_PHYSICAL_NAME_CONFIGURATION_KEY = "delta.columnMapping.physicalName"; public static final String MAX_COLUMN_ID_CONFIGURATION_KEY = "delta.columnMapping.maxColumnId"; + private static final String DELETION_VECTORS_CONFIGURATION_KEY = "delta.enableDeletionVectors"; // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#valid-feature-names-in-table-features - // TODO: Add support for 'deletionVectors' reader features private static final Set SUPPORTED_READER_FEATURES = ImmutableSet.builder() .add("columnMapping") .add("timestampNtz") + .add("deletionVectors") .build(); public enum ColumnMappingMode @@ -124,6 +125,11 @@ public static boolean isAppendOnly(MetadataEntry metadataEntry) return parseBoolean(metadataEntry.getConfiguration().getOrDefault(APPEND_ONLY_CONFIGURATION_KEY, "false")); } + public static boolean isDeletionVectorEnabled(MetadataEntry metadataEntry) + { + return parseBoolean(metadataEntry.getConfiguration().get(DELETION_VECTORS_CONFIGURATION_KEY)); + } + public static ColumnMappingMode getColumnMappingMode(MetadataEntry metadata) { String columnMappingMode = metadata.getConfiguration().getOrDefault(COLUMN_MAPPING_MODE_CONFIGURATION_KEY, "none"); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java index e89c149a28a2..fcc5219f4eef 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java @@ -158,6 +158,11 @@ public List getJsonTransactionLogEntries() return logTail.getFileEntries(); } + public List getTransactions() + { + return logTail.getTransactions(); + } + public Stream getCheckpointTransactionLogEntries( ConnectorSession session, Set entryTypes, diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/Transaction.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/Transaction.java new file mode 100644 index 000000000000..40b424845825 --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/Transaction.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.transactionlog; + +import com.google.common.collect.ImmutableList; + +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +public record Transaction(long transactionId, List transactionEntries) +{ + public Transaction + { + checkArgument(transactionId >= 0, "transactionId must be >= 0"); + transactionEntries = ImmutableList.copyOf(requireNonNull(transactionEntries, "transactionEntries is null")); + } +} diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java index 4ab57f596eb1..c37284e67cf9 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java @@ -51,6 +51,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.time.Instant; +import java.util.Collection; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -283,7 +284,7 @@ public static ImmutableList columnsWithStats(List activeAddEntries(Stream checkpointEntries, Stream jsonEntries) + private Stream activeAddEntries(Stream checkpointEntries, List transactions) { Map activeJsonEntries = new LinkedHashMap<>(); HashSet removedFiles = new HashSet<>(); @@ -291,17 +292,22 @@ private Stream activeAddEntries(Stream { - AddFileEntry addEntry = deltaLakeTransactionLogEntry.getAdd(); - if (addEntry != null) { - activeJsonEntries.put(addEntry.getPath(), addEntry); - } + transactions.forEach(transaction -> { + Map addFilesInTransaction = new LinkedHashMap<>(); + Set removedFilesInTransaction = new HashSet<>(); + transaction.transactionEntries().forEach(deltaLakeTransactionLogEntry -> { + if (deltaLakeTransactionLogEntry.getAdd() != null) { + addFilesInTransaction.put(deltaLakeTransactionLogEntry.getAdd().getPath(), deltaLakeTransactionLogEntry.getAdd()); + } + else if (deltaLakeTransactionLogEntry.getRemove() != null) { + removedFilesInTransaction.add(deltaLakeTransactionLogEntry.getRemove().getPath()); + } + }); - RemoveFileEntry removeEntry = deltaLakeTransactionLogEntry.getRemove(); - if (removeEntry != null) { - activeJsonEntries.remove(removeEntry.getPath()); - removedFiles.add(removeEntry.getPath()); - } + // Process 'remove' entries first because deletion vectors register both 'add' and 'remove' entries and the 'add' entry should be kept + removedFiles.addAll(removedFilesInTransaction); + removedFilesInTransaction.forEach(activeJsonEntries::remove); + activeJsonEntries.putAll(addFilesInTransaction); }); Stream filteredCheckpointEntries = checkpointEntries @@ -367,19 +373,19 @@ public Stream getCommitInfoEntries(TableSnapshot tableSnapshot, private Stream getEntries( TableSnapshot tableSnapshot, Set entryTypes, - BiFunction, Stream, Stream> entryMapper, + BiFunction, List, Stream> entryMapper, ConnectorSession session, TrinoFileSystem fileSystem, FileFormatDataSourceStats stats) { try { - Stream jsonEntries = tableSnapshot.getJsonTransactionLogEntries().stream(); + List transactions = tableSnapshot.getTransactions(); Stream checkpointEntries = tableSnapshot.getCheckpointTransactionLogEntries( session, entryTypes, checkpointSchemaManager, typeManager, fileSystem, stats); return entryMapper.apply( checkpointEntries, - jsonEntries); + transactions); } catch (IOException e) { throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Error reading transaction log for " + tableSnapshot.getTable(), e); @@ -400,7 +406,7 @@ private Stream getEntries( return getEntries( tableSnapshot, ImmutableSet.of(entryType), - (checkpointStream, jsonStream) -> entryMapper.apply(Stream.concat(checkpointStream, jsonStream)), + (checkpointStream, jsonStream) -> entryMapper.apply(Stream.concat(checkpointStream, jsonStream.stream().map(Transaction::transactionEntries).flatMap(Collection::stream))), session, fileSystem, stats); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java index deb9c69ad5f2..b990b535c1aa 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java @@ -24,6 +24,7 @@ import io.trino.plugin.deltalake.DeltaLakeColumnMetadata; import io.trino.plugin.deltalake.transactionlog.AddFileEntry; import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry; +import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry; import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; @@ -76,6 +77,7 @@ import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_DATA; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.isDeletionVectorEnabled; import static io.trino.plugin.deltalake.transactionlog.TransactionLogAccess.columnsWithStats; import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.START_OF_MODERN_ERA_EPOCH_DAY; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.ADD; @@ -395,18 +397,27 @@ private DeltaLakeTransactionLogEntry buildAddEntry(ConnectorSession session, Blo if (block.isNull(pagePosition)) { return null; } + boolean deletionVectorsEnabled = isDeletionVectorEnabled(metadataEntry); Block addEntryBlock = block.getObject(pagePosition, Block.class); log.debug("Block %s has %s fields", block, addEntryBlock.getPositionCount()); + String path = getString(addEntryBlock, 0); Map partitionValues = getMap(addEntryBlock, 1); long size = getLong(addEntryBlock, 2); long modificationTime = getLong(addEntryBlock, 3); boolean dataChange = getByte(addEntryBlock, 4) != 0; - Map tags = getMap(addEntryBlock, 7); + Optional deletionVector = Optional.empty(); + int position = 5; + if (deletionVectorsEnabled) { + if (!addEntryBlock.isNull(5)) { + deletionVector = Optional.of(parseDeletionVectorFromParquet(addEntryBlock.getObject(5, Block.class))); + } + position = 6; + } + Map tags = getMap(addEntryBlock, position + 2); - String path = getString(addEntryBlock, 0); AddFileEntry result; - if (!addEntryBlock.isNull(6)) { + if (!addEntryBlock.isNull(position + 1)) { result = new AddFileEntry( path, partitionValues, @@ -414,19 +425,21 @@ private DeltaLakeTransactionLogEntry buildAddEntry(ConnectorSession session, Blo modificationTime, dataChange, Optional.empty(), - Optional.of(parseStatisticsFromParquet(addEntryBlock.getObject(6, Block.class))), - tags); + Optional.of(parseStatisticsFromParquet(addEntryBlock.getObject(position + 1, Block.class))), + tags, + deletionVector); } - else if (!addEntryBlock.isNull(5)) { + else if (!addEntryBlock.isNull(position)) { result = new AddFileEntry( path, partitionValues, size, modificationTime, dataChange, - Optional.of(getString(addEntryBlock, 5)), + Optional.of(getString(addEntryBlock, position)), Optional.empty(), - tags); + tags, + deletionVector); } else { result = new AddFileEntry( @@ -437,13 +450,26 @@ else if (!addEntryBlock.isNull(5)) { dataChange, Optional.empty(), Optional.empty(), - tags); + tags, + deletionVector); } log.debug("Result: %s", result); return DeltaLakeTransactionLogEntry.addFileEntry(result); } + private DeletionVectorEntry parseDeletionVectorFromParquet(Block block) + { + checkArgument(block.getPositionCount() == 5, "Deletion vector entry must have 5 fields"); + + String storageType = getString(block, 0); + String pathOrInlineDv = getString(block, 1); + OptionalInt offset = block.isNull(2) ? OptionalInt.empty() : OptionalInt.of(getInt(block, 2)); + int sizeInBytes = getInt(block, 3); + long cardinality = getLong(block, 4); + return new DeletionVectorEntry(storageType, pathOrInlineDv, offset, sizeInBytes, cardinality); + } + private DeltaLakeParquetFileStatistics parseStatisticsFromParquet(Block statsRowBlock) { if (metadataEntry == null) { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java index 9125de8b63e9..cdea8eca148a 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java @@ -35,6 +35,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.isDeletionVectorEnabled; import static io.trino.plugin.deltalake.transactionlog.TransactionLogAccess.columnsWithStats; import static java.util.Objects.requireNonNull; @@ -42,6 +43,14 @@ public class CheckpointSchemaManager { private final TypeManager typeManager; + private static final RowType DELETION_VECTORS_TYPE = RowType.from(ImmutableList.builder() + .add(RowType.field("storageType", VarcharType.VARCHAR)) + .add(RowType.field("pathOrInlineDv", VarcharType.VARCHAR)) + .add(RowType.field("offset", IntegerType.INTEGER)) + .add(RowType.field("sizeInBytes", IntegerType.INTEGER)) + .add(RowType.field("cardinality", BigintType.BIGINT)) + .build()); + private static final RowType TXN_ENTRY_TYPE = RowType.from(ImmutableList.of( RowType.field("appId", VarcharType.createUnboundedVarcharType()), RowType.field("version", BigintType.BIGINT), @@ -106,6 +115,7 @@ public RowType getAddEntryType(MetadataEntry metadataEntry, boolean requireWrite { List allColumns = extractSchema(metadataEntry, typeManager); List minMaxColumns = columnsWithStats(metadataEntry, typeManager); + boolean deletionVectorEnabled = isDeletionVectorEnabled(metadataEntry); ImmutableList.Builder minMaxFields = ImmutableList.builder(); for (DeltaLakeColumnMetadata dataColumn : minMaxColumns) { @@ -139,6 +149,9 @@ public RowType getAddEntryType(MetadataEntry metadataEntry, boolean requireWrite addFields.add(RowType.field("size", BigintType.BIGINT)); addFields.add(RowType.field("modificationTime", BigintType.BIGINT)); addFields.add(RowType.field("dataChange", BooleanType.BOOLEAN)); + if (deletionVectorEnabled) { + addFields.add(RowType.field("deletionVector", DELETION_VECTORS_TYPE)); + } if (requireWriteStatsAsJson) { addFields.add(RowType.field("stats", VarcharType.createUnboundedVarcharType())); } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TransactionLogTail.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TransactionLogTail.java index fbbdc15f93ff..683674b2f51f 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TransactionLogTail.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TransactionLogTail.java @@ -19,14 +19,17 @@ import io.trino.filesystem.TrinoInputFile; import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.plugin.deltalake.transactionlog.MissingTransactionLogException; +import io.trino.plugin.deltalake.transactionlog.Transaction; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; +import java.util.Collection; import java.util.List; import java.util.Optional; +import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.parseJson; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogJsonEntryPath; @@ -37,10 +40,10 @@ public class TransactionLogTail { private static final int JSON_LOG_ENTRY_READ_BUFFER_SIZE = 1024 * 1024; - private final List entries; + private final List entries; private final long version; - private TransactionLogTail(List entries, long version) + private TransactionLogTail(List entries, long version) { this.entries = ImmutableList.copyOf(requireNonNull(entries, "entries is null")); this.version = version; @@ -63,7 +66,7 @@ public static TransactionLogTail loadNewTail( Optional endVersion) throws IOException { - ImmutableList.Builder entriesBuilder = ImmutableList.builder(); + ImmutableList.Builder entriesBuilder = ImmutableList.builder(); long version = startVersion.orElse(0L); long entryNumber = startVersion.map(start -> start + 1).orElse(0L); @@ -75,7 +78,7 @@ public static TransactionLogTail loadNewTail( while (!endOfTail) { results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem); if (results.isPresent()) { - entriesBuilder.addAll(results.get()); + entriesBuilder.add(new Transaction(entryNumber, results.get())); version = entryNumber; entryNumber++; } @@ -97,7 +100,7 @@ public static TransactionLogTail loadNewTail( public Optional getUpdatedTail(TrinoFileSystem fileSystem, String tableLocation) throws IOException { - ImmutableList.Builder entriesBuilder = ImmutableList.builder(); + ImmutableList.Builder entriesBuilder = ImmutableList.builder(); long newVersion = version; @@ -110,7 +113,7 @@ public Optional getUpdatedTail(TrinoFileSystem fileSystem, S // initialize entriesBuilder with entries we have already read entriesBuilder.addAll(entries); } - entriesBuilder.addAll(results.get()); + entriesBuilder.add(new Transaction(newVersion + 1, results.get())); newVersion++; } else { @@ -152,6 +155,11 @@ public static Optional> getEntriesFromJson(lo } public List getFileEntries() + { + return entries.stream().map(Transaction::transactionEntries).flatMap(Collection::stream).collect(toImmutableList()); + } + + public List getTransactions() { return entries; } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java index 9b5f8f2e92ed..a024340de21d 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java @@ -84,6 +84,7 @@ public class TestDeltaLakeBasic private static final List OTHER_TABLES = ImmutableList.of( new ResourceTable("stats_with_minmax_nulls", "deltalake/stats_with_minmax_nulls"), new ResourceTable("no_column_stats", "databricks73/no_column_stats"), + new ResourceTable("deletion_vectors", "databricks122/deletion_vectors"), new ResourceTable("timestamp_ntz", "databricks131/timestamp_ntz"), new ResourceTable("timestamp_ntz_partition", "databricks131/timestamp_ntz_partition")); @@ -629,6 +630,15 @@ public void testIdentityColumns() entry("delta.identity.allowExplicitInsert", false)); } + /** + * @see databricks122.deletion_vectors + */ + @Test + public void testDeletionVectors() + { + assertQuery("SELECT * FROM deletion_vectors", "VALUES (1, 11)"); + } + @Test public void testCorruptedManagedTableLocation() throws Exception diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java index ab10861a604f..ce82ffa0a256 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java @@ -183,13 +183,13 @@ public List getActiveFiles(TableSnapshot tableSnapshot, ConnectorS private AddFileEntry addFileEntryOfSize(long fileSize) { - return new AddFileEntry(FILE_PATH, ImmutableMap.of(), fileSize, 0, false, Optional.empty(), Optional.empty(), ImmutableMap.of()); + return new AddFileEntry(FILE_PATH, ImmutableMap.of(), fileSize, 0, false, Optional.empty(), Optional.empty(), ImmutableMap.of(), Optional.empty()); } private DeltaLakeSplit makeSplit(long start, long splitSize, long fileSize, double minimumAssignedSplitWeight) { SplitWeight splitWeight = SplitWeight.fromProportion(Math.min(Math.max((double) fileSize / splitSize, minimumAssignedSplitWeight), 1.0)); - return new DeltaLakeSplit(FULL_PATH, start, splitSize, fileSize, Optional.empty(), 0, splitWeight, TupleDomain.all(), ImmutableMap.of()); + return new DeltaLakeSplit(FULL_PATH, start, splitSize, fileSize, Optional.empty(), 0, Optional.empty(), splitWeight, TupleDomain.all(), ImmutableMap.of()); } private List getSplits(DeltaLakeSplitManager splitManager, DeltaLakeConfig deltaLakeConfig) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java index 3145c89fa043..83f6eb3f15bc 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java @@ -129,7 +129,8 @@ public void readsCheckpointFile() "\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" + "}"), Optional.empty(), - null)); + null, + Optional.empty())); assertThat(entries).element(7).extracting(DeltaLakeTransactionLogEntry::getAdd).isEqualTo( new AddFileEntry( @@ -145,7 +146,8 @@ public void readsCheckpointFile() "\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" + "}"), Optional.empty(), - null)); + null, + Optional.empty())); } // lets read two entry types in one call; add and protocol @@ -169,7 +171,8 @@ public void readsCheckpointFile() "\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" + "}"), Optional.empty(), - null)); + null, + Optional.empty())); assertThat(entries).element(6).extracting(DeltaLakeTransactionLogEntry::getProtocol).isEqualTo(new ProtocolEntry(1, 2, Optional.empty(), Optional.empty())); @@ -187,7 +190,8 @@ public void readsCheckpointFile() "\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" + "}"), Optional.empty(), - null)); + null, + Optional.empty())); } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointBuilder.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointBuilder.java index f19248c7fbb2..738272a7f381 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointBuilder.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointBuilder.java @@ -58,10 +58,10 @@ public void testCheckpointBuilder() builder.addLogEntry(transactionEntry(app1TransactionV1)); builder.addLogEntry(transactionEntry(app2TransactionV5)); - AddFileEntry addA1 = new AddFileEntry("a", Map.of(), 1, 1, true, Optional.empty(), Optional.empty(), Map.of()); + AddFileEntry addA1 = new AddFileEntry("a", Map.of(), 1, 1, true, Optional.empty(), Optional.empty(), Map.of(), Optional.empty()); RemoveFileEntry removeA1 = new RemoveFileEntry("a", 1, true); - AddFileEntry addA2 = new AddFileEntry("a", Map.of(), 2, 1, true, Optional.empty(), Optional.empty(), Map.of()); - AddFileEntry addB = new AddFileEntry("b", Map.of(), 1, 1, true, Optional.empty(), Optional.empty(), Map.of()); + AddFileEntry addA2 = new AddFileEntry("a", Map.of(), 2, 1, true, Optional.empty(), Optional.empty(), Map.of(), Optional.empty()); + AddFileEntry addB = new AddFileEntry("b", Map.of(), 1, 1, true, Optional.empty(), Optional.empty(), Map.of(), Optional.empty()); RemoveFileEntry removeB = new RemoveFileEntry("b", 1, true); RemoveFileEntry removeC = new RemoveFileEntry("c", 1, true); builder.addLogEntry(addFileEntry(addA1)); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java index 309da4d6c042..e22568e403df 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java @@ -155,7 +155,8 @@ public void testReadAddEntries() "\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" + "}"), Optional.empty(), - null)); + null, + Optional.empty())); assertThat(entries).element(7).extracting(DeltaLakeTransactionLogEntry::getAdd).isEqualTo( new AddFileEntry( @@ -171,7 +172,8 @@ public void testReadAddEntries() "\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" + "}"), Optional.empty(), - null)); + null, + Optional.empty())); } @Test @@ -216,7 +218,8 @@ public void testReadAllEntries() "\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" + "}"), Optional.empty(), - null)); + null, + Optional.empty())); // RemoveFileEntry assertThat(entries).element(3).extracting(DeltaLakeTransactionLogEntry::getRemove).isEqualTo( @@ -268,7 +271,8 @@ public void testSkipRemoveEntries() "\"ts\":1" + "}}"), Optional.empty(), - ImmutableMap.of()); + ImmutableMap.of(), + Optional.empty()); int numRemoveEntries = 100; Set removeEntries = IntStream.range(0, numRemoveEntries).mapToObj(x -> diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java index 3adbc82eed38..0e7edd7d3d14 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java @@ -178,7 +178,8 @@ public void testCheckpointWriteReadJsonRoundtrip() Optional.empty(), ImmutableMap.of( "someTag", "someValue", - "otherTag", "otherValue")); + "otherTag", "otherValue"), + Optional.empty()); RemoveFileEntry removeFileEntry = new RemoveFileEntry( "removeFilePath", @@ -314,7 +315,8 @@ public void testCheckpointWriteReadParquetStatisticsRoundtrip() .buildOrThrow()))), ImmutableMap.of( "someTag", "someValue", - "otherTag", "otherValue")); + "otherTag", "otherValue"), + Optional.empty()); RemoveFileEntry removeFileEntry = new RemoveFileEntry( "removeFilePath", @@ -390,7 +392,8 @@ public void testDisablingRowStatistics() "row", RowBlock.fromFieldBlocks(1, Optional.empty(), minMaxRowFieldBlocks).getSingleValueBlock(0))), Optional.of(ImmutableMap.of( "row", RowBlock.fromFieldBlocks(1, Optional.empty(), nullCountRowFieldBlocks).getSingleValueBlock(0))))), - ImmutableMap.of()); + ImmutableMap.of(), + Optional.empty()); CheckpointEntries entries = new CheckpointEntries( metadataEntry, @@ -428,7 +431,8 @@ private AddFileEntry makeComparable(AddFileEntry original) original.isDataChange(), original.getStatsString(), makeComparable(original.getStats()), - original.getTags()); + original.getTags(), + original.getDeletionVector()); } private Optional makeComparable(Optional original) diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java index 0a7f9cd02837..bceb68a685b9 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java @@ -13,10 +13,20 @@ */ package io.trino.tests.product.deltalake; +import com.google.common.collect.ImmutableList; +import io.trino.tempto.BeforeMethodWithContext; +import io.trino.tempto.assertions.QueryAssert.Row; import io.trino.testing.DataProviders; import io.trino.testng.services.Flaky; +import io.trino.tests.product.deltalake.util.DatabricksVersion; +import org.testng.SkipException; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; + import static io.trino.tempto.assertions.QueryAssert.Row.row; import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure; import static io.trino.testing.TestingNames.randomNameSuffix; @@ -29,13 +39,23 @@ import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getDatabricksRuntimeVersion; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; public class TestDeltaLakeDeleteCompatibility extends BaseTestDeltaLakeS3Storage { + private Optional databricksRuntimeVersion; + + @BeforeMethodWithContext + public void determineDatabricksVersion() + { + databricksRuntimeVersion = getDatabricksRuntimeVersion(); + } + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}, dataProviderClass = DataProviders.class, dataProvider = "trueFalse") @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testDeleteOnEnforcedConstraintsReturnsRowsCount(boolean partitioned) @@ -122,50 +142,393 @@ public void testTruncateTable() } // Databricks 12.1 and OSS Delta 2.4.0 added support for deletion vectors - @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingModeDataProvider") @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) - public void testDeletionVectors() + public void testDeletionVectors(String mode) { - // TODO https://github.com/trinodb/trino/issues/16903 Add support for deletionVectors reader features String tableName = "test_deletion_vectors_" + randomNameSuffix(); onDelta().executeQuery("" + "CREATE TABLE default." + tableName + " (a INT, b INT)" + " USING delta " + " LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + - " TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); + " TBLPROPERTIES ('delta.enableDeletionVectors' = true, 'delta.columnMapping.mode' = '" + mode + "')"); try { onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1,11), (2, 22)"); - onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 2"); + if (databricksRuntimeVersion.isEmpty() && (mode.equals("name") || mode.equals("id"))) { + assertQueryFailure(() -> onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 2")) + .hasMessageContaining("Can't resolve column __delta_internal_row_index in root"); + throw new SkipException("OSS Delta Lake doesn't support deletion vectors with column mapping mode 'name' and 'id'"); + } + else { + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 2"); + } assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) .containsOnly(row(1, 11)); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .containsOnly(row(1, 11)); + + // Reinsert the deleted row and verify that the row appears correctly + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (2, 22)"); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) + .containsOnly(row(1, 11), row(2, 22)); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .containsOnly(row(1, 11), row(2, 22)); + // Execute DELETE statement which doesn't delete any rows + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = -1"); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) + .containsOnly(row(1, 11), row(2, 22)); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .containsOnly(row(1, 11), row(2, 22)); + + // Verify other statements assertThat(onTrino().executeQuery("SHOW TABLES FROM delta.default")) .contains(row(tableName)); assertThat(onTrino().executeQuery("SELECT version, operation FROM delta.default.\"" + tableName + "$history\"")) // Use 'contains' method because newer Databricks clusters execute OPTIMIZE statement in the background .contains(row(0, "CREATE TABLE"), row(1, "WRITE"), row(2, "DELETE")); - assertThat(onTrino().executeQuery("SELECT comment FROM information_schema.columns WHERE table_schema = 'default' AND table_name = '" + tableName + "'")) - .hasNoRows(); - assertQueryFailure(() -> onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) - .hasMessageMatching(".* Table .* does not exist"); - assertQueryFailure(() -> onTrino().executeQuery("SHOW COLUMNS FROM delta.default." + tableName)) - .hasMessageMatching(".* Table .* does not exist"); - assertQueryFailure(() -> onTrino().executeQuery("DESCRIBE delta.default." + tableName)) - .hasMessageMatching(".* Table .* does not exist"); + assertThat(onTrino().executeQuery("SELECT column_name FROM delta.information_schema.columns WHERE table_schema = 'default' AND table_name = '" + tableName + "'")) + .contains(row("a"), row("b")); + assertThat(onTrino().executeQuery("SHOW COLUMNS FROM delta.default." + tableName)) + .contains(row("a", "integer", "", ""), row("b", "integer", "", "")); + assertThat(onTrino().executeQuery("DESCRIBE delta.default." + tableName)) + .contains(row("a", "integer", "", ""), row("b", "integer", "", "")); + + // TODO https://github.com/trinodb/trino/issues/17063 Use Delta Deletion Vectors for row-level deletes assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (3, 33)")) - .hasMessageMatching(".* Table .* does not exist"); + .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported"); assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM delta.default." + tableName)) - .hasMessageMatching(".* Table .* does not exist"); + .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported"); assertQueryFailure(() -> onTrino().executeQuery("UPDATE delta.default." + tableName + " SET a = 3")) - .hasMessageMatching(".* Table .* does not exist"); + .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported"); assertQueryFailure(() -> onTrino().executeQuery("MERGE INTO delta.default." + tableName + " t USING delta.default." + tableName + " s " + "ON (t.a = s.a) WHEN MATCHED THEN UPDATE SET b = -1")) - .hasMessageMatching(".* Table .* does not exist"); + .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported"); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsWithRandomPrefix() + { + String tableName = "test_deletion_vectors_random_prefix_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(a INT, b INT)" + + "USING delta " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true, 'delta.randomizeFilePrefixes' = true)"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 11), (2, 22)"); + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 2"); + + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) + .containsOnly(row(1, 11)); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .containsOnly(row(1, 11)); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDisableDeletionVectors() + { + String tableName = "test_deletion_vectors_random_prefix_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(a INT, b INT)" + + "USING delta " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 11), (2, 22), (3, 33)"); + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 2"); + + onDelta().executeQuery("ALTER TABLE default." + tableName + " SET TBLPROPERTIES ('delta.enableDeletionVectors' = false)"); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) + .containsOnly(row(1, 11), row(3, 33)); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .containsOnly(row(1, 11), row(3, 33)); + + // Delete rows which already existed before disabling deletion vectors + onDelta().executeQuery("DELETE FROM default." + tableName); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) + .hasNoRows(); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .hasNoRows(); + + // Insert new rows and delete it after disabling deletion vectors + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (4, 44), (5, 55)"); + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 4"); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) + .containsOnly(row(5, 55)); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .containsOnly(row(5, 55)); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsWithCheckpointInterval() + { + String tableName = "test_deletion_vectors_random_prefix_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(a INT, b INT)" + + "USING delta " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true, 'delta.checkpointInterval' = 1)"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 11), (2, 22)"); + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 2"); + + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) + .containsOnly(row(1, 11)); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .containsOnly(row(1, 11)); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsMergeDelete() + { + String tableName = "test_deletion_vectors_merge_delete_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(a INT)" + + "USING delta " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " SELECT explode(sequence(1, 10))"); + onDelta().executeQuery("MERGE INTO default." + tableName + " t USING default." + tableName + " s " + + "ON (t.a = s.a) WHEN MATCHED AND t.a > 5 THEN DELETE"); + + List expected = ImmutableList.of(row(1), row(2), row(3), row(4), row(5)); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).contains(expected); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsLargeNumbers() + { + String tableName = "test_deletion_vectors_large_numbers_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(a INT)" + + "USING delta " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " SELECT explode(sequence(1, 10000))"); + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a > 1"); + + List expected = ImmutableList.of(row(1)); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).contains(expected); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}, + dataProviderClass = DataProviders.class, dataProvider = "trueFalse") + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsAcrossAddFile(boolean partitioned) + { + String tableName = "test_deletion_vectors_accross_add_file_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(a INT, b INT)" + + "USING delta " + + (partitioned ? "PARTITIONED BY (a)" : "") + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1,11), (2,22)"); + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (3,33), (4,44)"); + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 2 OR a = 4"); + + List expected = ImmutableList.of(row(1, 11), row(3, 33)); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).containsOnly(expected); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).containsOnly(expected); + + // Verify behavior when the query doesn't read non-partition columns + assertThat(onTrino().executeQuery("SELECT count(*) FROM delta.default." + tableName)).containsOnly(row(2)); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsTruncateTable() + { + testDeletionVectorsDeleteAll(tableName -> { + if (databricksRuntimeVersion.isPresent()) { + onDelta().executeQuery("TRUNCATE TABLE default." + tableName); + } + else { + assertThatThrownBy(() -> onDelta().executeQuery("TRUNCATE TABLE default." + tableName)) + .hasMessageContaining("Table does not support truncates"); + throw new SkipException("OSS Delta Lake doesn't support truncating tables with deletion vector"); + } + }); + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsDeleteFrom() + { + testDeletionVectorsDeleteAll(tableName -> onDelta().executeQuery("DELETE FROM default." + tableName)); + } + + private void testDeletionVectorsDeleteAll(Consumer deleteRow) + { + String tableName = "test_deletion_vectors_delete_all_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(a INT)" + + "USING delta " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " SELECT explode(sequence(1, 1000))"); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).hasRowsCount(1000); + + deleteRow.accept(tableName); + + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).hasNoRows(); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).hasNoRows(); } finally { dropDeltaTableWithRetry("default." + tableName); } } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsOptimize() + { + String tableName = "test_deletion_vectors_optimize_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(a INT, b INT)" + + "USING delta " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1,11), (2,22)"); + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (3,33), (4,44)"); + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 1 OR a = 3"); + + List expected = ImmutableList.of(row(2, 22), row(4, 44)); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); + + onDelta().executeQuery("OPTIMIZE default." + tableName); + + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).contains(expected); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsAbsolutePath() + { + String baseTableName = "test_deletion_vectors_base_absolute_" + randomNameSuffix(); + String tableName = "test_deletion_vectors_absolute_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + baseTableName + + "(a INT, b INT)" + + "USING delta " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + baseTableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); + try { + onDelta().executeQuery("INSERT INTO default." + baseTableName + " VALUES (1,11), (2,22), (3,33), (4,44)"); + onDelta().executeQuery("DELETE FROM default." + baseTableName + " WHERE a = 1 OR a = 3"); + + // The cloned table has 'p' (absolute path) storageType for deletion vector + onDelta().executeQuery("CREATE TABLE default." + tableName + " SHALLOW CLONE " + baseTableName); + + List expected = ImmutableList.of(row(2, 22), row(4, 44)); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).contains(expected); + // TODO https://github.com/trinodb/trino/issues/17205 Fix below assertion when supporting absolute path + assertQueryFailure(() -> onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .hasMessageContaining("Failed to generate splits"); + } + finally { + dropDeltaTableWithRetry("default." + baseTableName); + dropDeltaTableWithRetry("default." + tableName); + } + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsWithChangeDataFeed() + { + String tableName = "test_deletion_vectors_cdf_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(a INT, b INT)" + + "USING delta " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true, 'delta.enableChangeDataFeed' = true)"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1,11), (2,22), (3,33), (4,44)"); + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 1 OR a = 3"); + + assertThat(onDelta().executeQuery( + "SELECT a, b, _change_type, _commit_version FROM table_changes('default." + tableName + "', 0)")) + .containsOnly( + row(1, 11, "insert", 1L), + row(2, 22, "insert", 1L), + row(3, 33, "insert", 1L), + row(4, 44, "insert", 1L), + row(1, 11, "delete", 2L), + row(3, 33, "delete", 2L)); + + // TODO Fix table_changes function failure + assertQueryFailure(() -> onTrino().executeQuery("SELECT a, b, _change_type, _commit_version FROM TABLE(delta.system.table_changes('default', '" + tableName + "', 0))")) + .hasMessageContaining("Change Data Feed is not enabled at version 2. Version contains 'remove' entries without 'cdc' entries"); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + @DataProvider + public Object[][] columnMappingModeDataProvider() + { + return new Object[][] { + {"none"}, + {"name"}, + {"id"} + }; + } }