From 6f155593e27a25b0eb15f9783c55fcf10a36dca7 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Fri, 1 Sep 2023 12:43:08 +0900 Subject: [PATCH] Add support for writer version 7 in Delta Lake Support the following writer features: * appendOnly * invariants * checkConstraints * changeDataFeed * columnMapping --- .../deltalake/DeltaLakeInsertTableHandle.java | 10 ++ .../plugin/deltalake/DeltaLakeMetadata.java | 117 +++++++++++------- .../deltalake/DeltaLakePageSinkProvider.java | 14 ++- .../DeltaLakePageSourceProvider.java | 4 +- .../deltalake/DeltaLakeParquetSchemas.java | 9 +- .../deltalake/DeltaLakeSplitManager.java | 2 +- .../procedure/DeltaTableOptimizeHandle.java | 11 ++ .../deltalake/procedure/VacuumProcedure.java | 8 +- .../FileBasedTableStatisticsProvider.java | 2 +- .../DeltaLakeSchemaSupport.java | 87 ++++++++++--- .../transactionlog/MetadataEntry.java | 16 --- .../transactionlog/ProtocolEntry.java | 20 +++ .../transactionlog/TableSnapshot.java | 73 +++++++---- .../transactionlog/TransactionLogAccess.java | 4 +- .../checkpoint/CheckpointEntryIterator.java | 14 ++- .../checkpoint/CheckpointSchemaManager.java | 9 +- .../checkpoint/CheckpointWriter.java | 16 +-- .../plugin/deltalake/TestDeltaLakeBasic.java | 4 +- ...eltaLakeMinioAndHmsConnectorSmokeTest.java | 22 ++++ .../deltalake/TestTransactionLogAccess.java | 2 +- .../TestCheckpointEntryIterator.java | 26 ++-- .../checkpoint/TestCheckpointWriter.java | 9 +- .../TestDeltaLakeFileStatistics.java | 18 +++ .../invariants_writer_feature/README.md | 29 +++++ .../_delta_log/00000000000000000000.json | 3 + .../_delta_log/00000000000000000001.json | 2 + ...4bd0-8f60-4ce39aa50ef9-c000.snappy.parquet | Bin 0 -> 650 bytes .../TestDeltaLakeAlterTableCompatibility.java | 84 +++++++++++-- ...tDeltaLakeChangeDataFeedCompatibility.java | 65 ++++++++++ ...DeltaLakeCheckConstraintCompatibility.java | 41 ++++++ .../TestDeltaLakeColumnMappingMode.java | 46 +++++++ ...akeDatabricksCheckpointsCompatibility.java | 19 ++- .../TestDeltaLakeDeleteCompatibility.java | 55 +++++++- ...tDeltaLakeIdentityColumnCompatibility.java | 8 +- ...DeltaLakeWriteDatabricksCompatibility.java | 28 ++++- .../deltalake/util/DeltaLakeTestUtils.java | 29 +++++ 36 files changed, 738 insertions(+), 168 deletions(-) create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks122/invariants_writer_feature/README.md create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks122/invariants_writer_feature/_delta_log/00000000000000000000.json create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks122/invariants_writer_feature/_delta_log/00000000000000000001.json create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks122/invariants_writer_feature/part-00001-3e38bdb2-fccc-4bd0-8f60-4ce39aa50ef9-c000.snappy.parquet diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeInsertTableHandle.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeInsertTableHandle.java index e73f09f0aac4..fef3c829aa11 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeInsertTableHandle.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeInsertTableHandle.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; +import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import io.trino.spi.connector.ConnectorInsertTableHandle; import io.trino.spi.connector.SchemaTableName; @@ -30,6 +31,7 @@ public class DeltaLakeInsertTableHandle private final SchemaTableName tableName; private final String location; private final MetadataEntry metadataEntry; + private final ProtocolEntry protocolEntry; private final List inputColumns; private final long readVersion; private final boolean retriesEnabled; @@ -39,12 +41,14 @@ public DeltaLakeInsertTableHandle( @JsonProperty("tableName") SchemaTableName tableName, @JsonProperty("location") String location, @JsonProperty("metadataEntry") MetadataEntry metadataEntry, + @JsonProperty("protocolEntry") ProtocolEntry protocolEntry, @JsonProperty("inputColumns") List inputColumns, @JsonProperty("readVersion") long readVersion, @JsonProperty("retriesEnabled") boolean retriesEnabled) { this.tableName = requireNonNull(tableName, "tableName is null"); this.metadataEntry = requireNonNull(metadataEntry, "metadataEntry is null"); + this.protocolEntry = requireNonNull(protocolEntry, "protocolEntry is null"); this.inputColumns = ImmutableList.copyOf(inputColumns); this.location = requireNonNull(location, "location is null"); this.readVersion = readVersion; @@ -69,6 +73,12 @@ public MetadataEntry getMetadataEntry() return metadataEntry; } + @JsonProperty + public ProtocolEntry getProtocolEntry() + { + return protocolEntry; + } + @JsonProperty public List getInputColumns() { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index ef8620448cf5..2d998ddbc35b 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -239,6 +239,7 @@ import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeSchemaAsJson; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeStatsAsJson; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.unsupportedReaderFeatures; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.unsupportedWriterFeatures; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.validateType; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.verifySupportedColumnMapping; import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.DELTA_CHANGE_DATA_FEED_ENABLED_PROPERTY; @@ -331,7 +332,7 @@ public class DeltaLakeMetadata public static final int DEFAULT_WRITER_VERSION = 2; // The highest reader and writer versions Trino supports private static final int MAX_READER_VERSION = 3; - public static final int MAX_WRITER_VERSION = 6; + public static final int MAX_WRITER_VERSION = 7; private static final int CDF_SUPPORTED_WRITER_VERSION = 4; private static final int COLUMN_MAPPING_MODE_SUPPORTED_READER_VERSION = 2; private static final int COLUMN_MAPPING_MODE_SUPPORTED_WRITER_VERSION = 5; @@ -517,7 +518,7 @@ public LocatedTableHandle getTableHandle(ConnectorSession session, SchemaTableNa LOG.debug("Skip %s because the table contains unsupported reader features: %s", tableName, unsupportedReaderFeatures); return null; } - verifySupportedColumnMapping(getColumnMappingMode(metadataEntry)); + verifySupportedColumnMapping(getColumnMappingMode(metadataEntry, protocolEntry)); return new DeltaLakeTableHandle( tableName.getSchemaName(), tableName.getTableName(), @@ -562,12 +563,13 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect // This method does not calculate column metadata for the projected columns checkArgument(tableHandle.getProjectedColumns().isEmpty(), "Unexpected projected columns"); MetadataEntry metadataEntry = tableHandle.getMetadataEntry(); + ProtocolEntry protocolEntry = tableHandle.getProtocolEntry(); List constraints = ImmutableList.builder() - .addAll(getCheckConstraints(metadataEntry).values()) - .addAll(getColumnInvariants(metadataEntry).values()) // The internal logic for column invariants in Delta Lake is same as check constraints + .addAll(getCheckConstraints(metadataEntry, protocolEntry).values()) + .addAll(getColumnInvariants(metadataEntry, protocolEntry).values()) // The internal logic for column invariants in Delta Lake is same as check constraints .build(); - List columns = getTableColumnMetadata(metadataEntry); + List columns = getTableColumnMetadata(metadataEntry, protocolEntry); ImmutableMap.Builder properties = ImmutableMap.builder() .put(LOCATION_PROPERTY, tableHandle.getLocation()); @@ -579,10 +581,10 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect Optional checkpointInterval = metadataEntry.getCheckpointInterval(); checkpointInterval.ifPresent(value -> properties.put(CHECKPOINT_INTERVAL_PROPERTY, value)); - Optional changeDataFeedEnabled = metadataEntry.isChangeDataFeedEnabled(); - changeDataFeedEnabled.ifPresent(value -> properties.put(CHANGE_DATA_FEED_ENABLED_PROPERTY, value)); + changeDataFeedEnabled(metadataEntry, protocolEntry) + .ifPresent(value -> properties.put(CHANGE_DATA_FEED_ENABLED_PROPERTY, value)); - ColumnMappingMode columnMappingMode = DeltaLakeSchemaSupport.getColumnMappingMode(metadataEntry); + ColumnMappingMode columnMappingMode = DeltaLakeSchemaSupport.getColumnMappingMode(metadataEntry, protocolEntry); if (columnMappingMode != NONE) { properties.put(COLUMN_MAPPING_MODE_PROPERTY, columnMappingMode.name()); } @@ -604,12 +606,12 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect .collect(toImmutableList())); } - private List getTableColumnMetadata(MetadataEntry metadataEntry) + private List getTableColumnMetadata(MetadataEntry metadataEntry, ProtocolEntry protocolEntry) { Map columnComments = getColumnComments(metadataEntry); Map columnsNullability = getColumnsNullability(metadataEntry); Map columnGenerations = getGeneratedColumnExpressions(metadataEntry); - List columns = getColumns(metadataEntry).stream() + List columns = getColumns(metadataEntry, protocolEntry).stream() .map(column -> getColumnMetadata(column, columnComments.get(column.getBaseColumnName()), columnsNullability.getOrDefault(column.getBaseColumnName(), true), columnGenerations.get(column.getBaseColumnName()))) .collect(toImmutableList()); return columns; @@ -632,7 +634,7 @@ public Map getColumnHandles(ConnectorSession session, Conn DeltaLakeTableHandle table = checkValidTableHandle(tableHandle); return table.getProjectedColumns() .map(projectColumns -> (Collection) projectColumns) - .orElseGet(() -> getColumns(table.getMetadataEntry())).stream() + .orElseGet(() -> getColumns(table.getMetadataEntry(), table.getProtocolEntry())).stream() // This method does not calculate column name for the projected columns .peek(handle -> checkArgument(handle.isBaseColumn(), "Unsupported projected column: %s", handle)) .collect(toImmutableMap(DeltaLakeColumnHandle::getBaseColumnName, identity())); @@ -711,11 +713,13 @@ public Iterator streamTableColumns(ConnectorSession sessio return Stream.of(); } String tableLocation = metastoreTable.get().location(); - MetadataEntry metadata = transactionLogAccess.getMetadataEntry(getSnapshot(table, tableLocation, session), session); + TableSnapshot snapshot = getSnapshot(table, tableLocation, session); + MetadataEntry metadata = transactionLogAccess.getMetadataEntry(snapshot, session); + ProtocolEntry protocol = transactionLogAccess.getProtocolEntry(session, snapshot); Map columnComments = getColumnComments(metadata); Map columnsNullability = getColumnsNullability(metadata); Map columnGenerations = getGeneratedColumnExpressions(metadata); - List columnMetadata = getColumns(metadata).stream() + List columnMetadata = getColumns(metadata, protocol).stream() .map(column -> getColumnMetadata(column, columnComments.get(column.getColumnName()), columnsNullability.getOrDefault(column.getBaseColumnName(), true), columnGenerations.get(column.getBaseColumnName()))) .collect(toImmutableList()); return Stream.of(TableColumnsMetadata.forTable(table, columnMetadata)); @@ -733,10 +737,10 @@ public Iterator streamTableColumns(ConnectorSession sessio .iterator(); } - private List getColumns(MetadataEntry deltaMetadata) + private List getColumns(MetadataEntry deltaMetadata, ProtocolEntry protocolEntry) { ImmutableList.Builder columns = ImmutableList.builder(); - extractSchema(deltaMetadata, typeManager).stream() + extractSchema(deltaMetadata, protocolEntry, typeManager).stream() .map(column -> toColumnHandle(column.getName(), column.getType(), column.getFieldId(), column.getPhysicalName(), column.getPhysicalColumnType(), deltaMetadata.getLowercasePartitionColumns())) .forEach(columns::add); columns.add(pathColumnHandle()); @@ -1276,10 +1280,12 @@ public void setTableComment(ConnectorSession session, ConnectorTableHandle table { DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle); checkSupportedWriterVersion(handle); - ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry()); + ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry(), handle.getProtocolEntry()); if (columnMappingMode != ID && columnMappingMode != NAME && columnMappingMode != NONE) { throw new TrinoException(NOT_SUPPORTED, "Setting a table comment with column mapping %s is not supported".formatted(columnMappingMode)); } + ProtocolEntry protocolEntry = handle.getProtocolEntry(); + checkUnsupportedWriterFeatures(protocolEntry); ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle); @@ -1297,7 +1303,7 @@ public void setTableComment(ConnectorSession session, ConnectorTableHandle table SET_TBLPROPERTIES_OPERATION, session, comment, - handle.getProtocolEntry()); + protocolEntry); transactionLogWriter.flush(); } catch (Exception e) { @@ -1312,10 +1318,12 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl DeltaLakeColumnHandle deltaLakeColumnHandle = (DeltaLakeColumnHandle) column; verify(deltaLakeColumnHandle.isBaseColumn(), "Unexpected dereference: %s", column); checkSupportedWriterVersion(deltaLakeTableHandle); - ColumnMappingMode columnMappingMode = getColumnMappingMode(deltaLakeTableHandle.getMetadataEntry()); + ColumnMappingMode columnMappingMode = getColumnMappingMode(deltaLakeTableHandle.getMetadataEntry(), deltaLakeTableHandle.getProtocolEntry()); if (columnMappingMode != ID && columnMappingMode != NAME && columnMappingMode != NONE) { throw new TrinoException(NOT_SUPPORTED, "Setting a column comment with column mapping %s is not supported".formatted(columnMappingMode)); } + ProtocolEntry protocolEntry = deltaLakeTableHandle.getProtocolEntry(); + checkUnsupportedWriterFeatures(protocolEntry); try { long commitVersion = deltaLakeTableHandle.getReadVersion() + 1; @@ -1341,7 +1349,7 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl CHANGE_COLUMN_OPERATION, session, Optional.ofNullable(deltaLakeTableHandle.getMetadataEntry().getDescription()), - deltaLakeTableHandle.getProtocolEntry()); + protocolEntry); transactionLogWriter.flush(); } catch (Exception e) { @@ -1365,11 +1373,13 @@ public void setViewColumnComment(ConnectorSession session, SchemaTableName viewN public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata newColumnMetadata) { DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle); + ProtocolEntry protocolEntry = handle.getProtocolEntry(); checkSupportedWriterVersion(handle); - ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry()); - if (changeDataFeedEnabled(handle.getMetadataEntry()) && CHANGE_DATA_FEED_COLUMN_NAMES.contains(newColumnMetadata.getName())) { + ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry(), protocolEntry); + if (changeDataFeedEnabled(handle.getMetadataEntry(), protocolEntry).orElse(false) && CHANGE_DATA_FEED_COLUMN_NAMES.contains(newColumnMetadata.getName())) { throw new TrinoException(NOT_SUPPORTED, "Column name %s is forbidden when change data feed is enabled".formatted(newColumnMetadata.getName())); } + checkUnsupportedWriterFeatures(protocolEntry); if (!newColumnMetadata.isNullable() && !transactionLogAccess.getActiveFiles(getSnapshot(handle.getSchemaTableName(), handle.getLocation(), session), session).isEmpty()) { throw new TrinoException(DELTA_LAKE_BAD_WRITE, format("Unable to add NOT NULL column '%s' for non-empty table: %s.%s", newColumnMetadata.getName(), handle.getSchemaName(), handle.getTableName())); @@ -1427,7 +1437,7 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle ADD_COLUMN_OPERATION, session, Optional.ofNullable(handle.getMetadataEntry().getDescription()), - handle.getProtocolEntry()); + protocolEntry); transactionLogWriter.flush(); } catch (Exception e) { @@ -1443,9 +1453,11 @@ public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandl verify(deltaLakeColumn.isBaseColumn(), "Unexpected dereference: %s", deltaLakeColumn); String dropColumnName = deltaLakeColumn.getBaseColumnName(); MetadataEntry metadataEntry = table.getMetadataEntry(); + ProtocolEntry protocolEntry = table.getProtocolEntry(); + checkUnsupportedWriterFeatures(protocolEntry); checkSupportedWriterVersion(table); - ColumnMappingMode columnMappingMode = getColumnMappingMode(metadataEntry); + ColumnMappingMode columnMappingMode = getColumnMappingMode(metadataEntry, protocolEntry); if (columnMappingMode != ColumnMappingMode.NAME && columnMappingMode != ColumnMappingMode.ID) { throw new TrinoException(NOT_SUPPORTED, "Cannot drop column from table using column mapping mode " + columnMappingMode); } @@ -1458,7 +1470,7 @@ public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandl // Use equalsIgnoreCase because the remote column name can contain uppercase characters // Creating a table with ambiguous names (e.g. "a" and "A") is disallowed, so this should be safe - List columns = extractSchema(metadataEntry, typeManager); + List columns = extractSchema(metadataEntry, protocolEntry, typeManager); List columnNames = getExactColumnNames(metadataEntry).stream() .filter(name -> !name.equalsIgnoreCase(dropColumnName)) .collect(toImmutableList()); @@ -1493,7 +1505,7 @@ public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandl DROP_COLUMN_OPERATION, session, Optional.ofNullable(metadataEntry.getDescription()), - table.getProtocolEntry()); + protocolEntry); transactionLogWriter.flush(); } catch (Exception e) { @@ -1524,14 +1536,16 @@ public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHan DeltaLakeColumnHandle deltaLakeColumn = (DeltaLakeColumnHandle) columnHandle; verify(deltaLakeColumn.isBaseColumn(), "Unexpected dereference: %s", deltaLakeColumn); String sourceColumnName = deltaLakeColumn.getBaseColumnName(); + ProtocolEntry protocolEntry = table.getProtocolEntry(); + checkUnsupportedWriterFeatures(protocolEntry); checkSupportedWriterVersion(table); - if (changeDataFeedEnabled(table.getMetadataEntry())) { + if (changeDataFeedEnabled(table.getMetadataEntry(), protocolEntry).orElse(false)) { throw new TrinoException(NOT_SUPPORTED, "Cannot rename column when change data feed is enabled"); } MetadataEntry metadataEntry = table.getMetadataEntry(); - ColumnMappingMode columnMappingMode = getColumnMappingMode(metadataEntry); + ColumnMappingMode columnMappingMode = getColumnMappingMode(metadataEntry, protocolEntry); if (columnMappingMode != ColumnMappingMode.NAME && columnMappingMode != ColumnMappingMode.ID) { throw new TrinoException(NOT_SUPPORTED, "Cannot rename column in table using column mapping mode " + columnMappingMode); } @@ -1575,7 +1589,7 @@ public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHan RENAME_COLUMN_OPERATION, session, Optional.ofNullable(metadataEntry.getDescription()), - table.getProtocolEntry()); + protocolEntry); transactionLogWriter.flush(); // Don't update extended statistics because it uses physical column names internally } @@ -1712,6 +1726,7 @@ private DeltaLakeInsertTableHandle createInsertHandle(ConnectorSession session, table.getSchemaTableName(), tableLocation, table.getMetadataEntry(), + table.getProtocolEntry(), inputColumns, getMandatoryCurrentVersion(fileSystem, tableLocation), retryMode != NO_RETRIES); @@ -1771,7 +1786,7 @@ public Optional finishInsert( // it is not obvious why we need to persist this readVersion transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, commitVersion, createdTime, INSERT_OPERATION, handle.getReadVersion())); - ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry()); + ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry(), handle.getProtocolEntry()); List partitionColumns = getPartitionColumns( handle.getMetadataEntry().getOriginalPartitionColumns(), handle.getInputColumns(), @@ -1797,7 +1812,7 @@ public Optional finishInsert( maxFileModificationTime, computedStatistics, exactColumnNames, - Optional.of(extractSchema(handle.getMetadataEntry(), typeManager).stream() + Optional.of(extractSchema(handle.getMetadataEntry(), handle.getProtocolEntry(), typeManager).stream() .collect(toImmutableMap(DeltaLakeColumnMetadata::getName, DeltaLakeColumnMetadata::getPhysicalName)))); } } @@ -1859,13 +1874,13 @@ public Optional getUpdateLayout(ConnectorSession se public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, RetryMode retryMode) { DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle; - if (isAppendOnly(handle.getMetadataEntry())) { + if (isAppendOnly(handle.getMetadataEntry(), handle.getProtocolEntry())) { throw new TrinoException(NOT_SUPPORTED, "Cannot modify rows from a table with '" + APPEND_ONLY_CONFIGURATION_KEY + "' set to true"); } checkWriteAllowed(session, handle); checkWriteSupported(handle); - List inputColumns = getColumns(handle.getMetadataEntry()).stream() + List inputColumns = getColumns(handle.getMetadataEntry(), handle.getProtocolEntry()).stream() .filter(column -> column.getColumnType() != SYNTHESIZED) .collect(toImmutableList()); @@ -1926,7 +1941,7 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle merg long writeTimestamp = Instant.now().toEpochMilli(); - ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry()); + ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry(), handle.getProtocolEntry()); List partitionColumns = getPartitionColumns( handle.getMetadataEntry().getOriginalPartitionColumns(), mergeHandle.getInsertTableHandle().getInputColumns(), @@ -1986,6 +2001,7 @@ public Optional getTableHandleForExecute( RetryMode retryMode) { DeltaLakeTableHandle tableHandle = checkValidTableHandle(connectorTableHandle); + checkUnsupportedWriterFeatures(tableHandle.getProtocolEntry()); DeltaLakeTableProcedureId procedureId; try { @@ -2004,7 +2020,7 @@ private Optional getTableHandleForOptimize(DeltaLak { DataSize maxScannedFileSize = (DataSize) executeProperties.get("file_size_threshold"); - List columns = getColumns(tableHandle.getMetadataEntry()).stream() + List columns = getColumns(tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry()).stream() .filter(column -> column.getColumnType() != SYNTHESIZED) .collect(toImmutableList()); @@ -2013,6 +2029,7 @@ private Optional getTableHandleForOptimize(DeltaLak OPTIMIZE, new DeltaTableOptimizeHandle( tableHandle.getMetadataEntry(), + tableHandle.getProtocolEntry(), columns, tableHandle.getMetadataEntry().getOriginalPartitionColumns(), maxScannedFileSize, @@ -2133,7 +2150,7 @@ private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandl List partitionColumns = getPartitionColumns( optimizeHandle.getMetadataEntry().getOriginalPartitionColumns(), optimizeHandle.getTableColumns(), - getColumnMappingMode(optimizeHandle.getMetadataEntry())); + getColumnMappingMode(optimizeHandle.getMetadataEntry(), optimizeHandle.getProtocolEntry())); appendAddFileEntries(transactionLogWriter, dataFileInfos, partitionColumns, getExactColumnNames(optimizeHandle.getMetadataEntry()), false); transactionLogWriter.flush(); @@ -2180,14 +2197,22 @@ private void checkWriteSupported(DeltaLakeTableHandle handle) { checkSupportedWriterVersion(handle); checkUnsupportedGeneratedColumns(handle.getMetadataEntry()); - ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry()); + ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry(), handle.getProtocolEntry()); if (!(columnMappingMode == NONE || columnMappingMode == ColumnMappingMode.NAME || columnMappingMode == ColumnMappingMode.ID)) { throw new TrinoException(NOT_SUPPORTED, "Writing with column mapping %s is not supported".formatted(columnMappingMode)); } - if (getColumnIdentities(handle.getMetadataEntry()).values().stream().anyMatch(identity -> identity)) { + if (getColumnIdentities(handle.getMetadataEntry(), handle.getProtocolEntry()).values().stream().anyMatch(identity -> identity)) { throw new TrinoException(NOT_SUPPORTED, "Writing to tables with identity columns is not supported"); } - // TODO: Check writer-features + checkUnsupportedWriterFeatures(handle.getProtocolEntry()); + } + + private static void checkUnsupportedWriterFeatures(ProtocolEntry protocolEntry) + { + Set unsupportedWriterFeatures = unsupportedWriterFeatures(protocolEntry.getWriterFeatures().orElse(ImmutableSet.of())); + if (!unsupportedWriterFeatures.isEmpty()) { + throw new TrinoException(NOT_SUPPORTED, "Unsupported writer features: " + unsupportedWriterFeatures); + } } private void checkUnsupportedGeneratedColumns(MetadataEntry metadataEntry) @@ -2361,7 +2386,7 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta boolean changeDataFeedEnabled = (Boolean) properties.get(CHANGE_DATA_FEED_ENABLED_PROPERTY) .orElseThrow(() -> new IllegalArgumentException("The change_data_feed_enabled property cannot be empty")); if (changeDataFeedEnabled) { - Set columnNames = getColumns(handle.getMetadataEntry()).stream().map(DeltaLakeColumnHandle::getBaseColumnName).collect(toImmutableSet()); + Set columnNames = getColumns(handle.getMetadataEntry(), handle.getProtocolEntry()).stream().map(DeltaLakeColumnHandle::getBaseColumnName).collect(toImmutableSet()); Set conflicts = Sets.intersection(columnNames, CHANGE_DATA_FEED_COLUMN_NAMES); if (!conflicts.isEmpty()) { throw new TrinoException(NOT_SUPPORTED, "Unable to enable change data feed because table contains %s columns".formatted(conflicts)); @@ -2564,7 +2589,7 @@ public Optional> applyFilter(C DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) handle; SchemaTableName tableName = tableHandle.getSchemaTableName(); - Set partitionColumns = ImmutableSet.copyOf(extractPartitionColumns(tableHandle.getMetadataEntry(), typeManager)); + Set partitionColumns = ImmutableSet.copyOf(extractPartitionColumns(tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), typeManager)); Map constraintDomains = constraint.getSummary().getDomains().orElseThrow(() -> new IllegalArgumentException("constraint summary is NONE")); ImmutableMap.Builder enforceableDomains = ImmutableMap.builder(); @@ -2690,7 +2715,7 @@ public Optional> applyProjecti else { // Create a new column handle DeltaLakeColumnHandle oldColumnHandle = (DeltaLakeColumnHandle) assignments.get(projectedColumn.getVariable().getName()); - projectedColumnHandle = projectColumn(oldColumnHandle, projectedColumn.getDereferenceIndices(), expression.getType(), getColumnMappingMode(deltaLakeTableHandle.getMetadataEntry())); + projectedColumnHandle = projectColumn(oldColumnHandle, projectedColumn.getDereferenceIndices(), expression.getType(), getColumnMappingMode(deltaLakeTableHandle.getMetadataEntry(), deltaLakeTableHandle.getProtocolEntry())); projectedColumnName = projectedColumnHandle.getQualifiedPhysicalName(); } @@ -2864,7 +2889,7 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession alreadyAnalyzedModifiedTimeMax.orElse(EPOCH))); } - List columnsMetadata = extractSchema(metadata, typeManager); + List columnsMetadata = extractSchema(metadata, handle.getProtocolEntry(), typeManager); Set allColumnNames = columnsMetadata.stream().map(columnMetadata -> columnMetadata.getName().toLowerCase(ENGLISH)).collect(Collectors.toSet()); Optional> analyzeColumnNames = getColumnNames(analyzeProperties); if (analyzeColumnNames.isPresent()) { @@ -3021,7 +3046,7 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH generateMissingFileStatistics(session, tableHandle, computedStatistics); } Optional maxFileModificationTime = getMaxFileModificationTime(computedStatistics); - Map physicalColumnNameMapping = extractSchema(tableHandle.getMetadataEntry(), typeManager).stream() + Map physicalColumnNameMapping = extractSchema(tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), typeManager).stream() .collect(toImmutableMap(DeltaLakeColumnMetadata::getName, DeltaLakeColumnMetadata::getPhysicalName)); updateTableStatistics( session, @@ -3052,7 +3077,7 @@ private void generateMissingFileStatistics(ConnectorSession session, DeltaLakeTa return; } - Map lowercaseToColumnsHandles = getColumns(tableHandle.getMetadataEntry()).stream() + Map lowercaseToColumnsHandles = getColumns(tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry()).stream() .filter(column -> column.getColumnType() == REGULAR) .collect(toImmutableMap(columnHandle -> columnHandle.getBaseColumnName().toLowerCase(ENGLISH), identity())); @@ -3321,7 +3346,7 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa public Optional applyDelete(ConnectorSession session, ConnectorTableHandle handle) { DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) handle; - if (tableHandle.getMetadataEntry().isChangeDataFeedEnabled().orElse(false)) { + if (changeDataFeedEnabled(tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry()).orElse(false)) { // For tables with CDF enabled the DELETE operation can't be performed only on metadata files return Optional.empty(); } @@ -3338,7 +3363,7 @@ public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle private OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle handle, String operation) { DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) handle; - if (isAppendOnly(tableHandle.getMetadataEntry())) { + if (isAppendOnly(tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry())) { throw new TrinoException(NOT_SUPPORTED, "Cannot modify rows from a table with '" + APPEND_ONLY_CONFIGURATION_KEY + "' set to true"); } checkWriteAllowed(session, tableHandle); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSinkProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSinkProvider.java index 3d425e12cba2..69dacb0d331f 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSinkProvider.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSinkProvider.java @@ -21,6 +21,7 @@ import io.trino.plugin.deltalake.procedure.DeltaLakeTableExecuteHandle; import io.trino.plugin.deltalake.procedure.DeltaTableOptimizeHandle; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; +import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import io.trino.plugin.hive.NodeVersion; import io.trino.spi.PageIndexerFactory; import io.trino.spi.connector.ConnectorInsertTableHandle; @@ -119,7 +120,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa { DeltaLakeInsertTableHandle tableHandle = (DeltaLakeInsertTableHandle) insertTableHandle; MetadataEntry metadataEntry = tableHandle.getMetadataEntry(); - DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(metadataEntry, typeManager); + DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(metadataEntry, tableHandle.getProtocolEntry(), typeManager); return new DeltaLakePageSink( typeManager.getTypeOperators(), tableHandle.getInputColumns(), @@ -142,7 +143,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa switch (executeHandle.getProcedureId()) { case OPTIMIZE: DeltaTableOptimizeHandle optimizeHandle = (DeltaTableOptimizeHandle) executeHandle.getProcedureHandle(); - DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(optimizeHandle.getMetadataEntry(), typeManager); + DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(optimizeHandle.getMetadataEntry(), optimizeHandle.getProtocolEntry(), typeManager); return new DeltaLakePageSink( typeManager.getTypeOperators(), optimizeHandle.getTableColumns(), @@ -167,7 +168,7 @@ public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transaction DeltaLakeMergeTableHandle merge = (DeltaLakeMergeTableHandle) mergeHandle; DeltaLakeInsertTableHandle tableHandle = merge.getInsertTableHandle(); ConnectorPageSink pageSink = createPageSink(transactionHandle, session, tableHandle, pageSinkId); - DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(tableHandle.getMetadataEntry(), typeManager); + DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), typeManager); return new DeltaLakeMergeSink( typeManager.getTypeOperators(), @@ -183,7 +184,7 @@ public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transaction tableHandle.getInputColumns(), domainCompactionThreshold, () -> createCdfPageSink(merge, session), - changeDataFeedEnabled(tableHandle.getMetadataEntry()), + changeDataFeedEnabled(tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry()).orElse(false), parquetSchemaMapping); } @@ -192,8 +193,9 @@ private DeltaLakeCdfPageSink createCdfPageSink( ConnectorSession session) { MetadataEntry metadataEntry = mergeTableHandle.getTableHandle().getMetadataEntry(); + ProtocolEntry protocolEntry = mergeTableHandle.getTableHandle().getProtocolEntry(); Set partitionKeys = mergeTableHandle.getTableHandle().getMetadataEntry().getOriginalPartitionColumns().stream().collect(toImmutableSet()); - List tableColumns = extractSchema(metadataEntry, typeManager).stream() + List tableColumns = extractSchema(metadataEntry, protocolEntry, typeManager).stream() .map(metadata -> new DeltaLakeColumnHandle( metadata.getName(), metadata.getType(), @@ -216,7 +218,7 @@ private DeltaLakeCdfPageSink createCdfPageSink( .build(); Location tableLocation = Location.of(mergeTableHandle.getTableHandle().getLocation()); - DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(metadataEntry, typeManager, true); + DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(metadataEntry, protocolEntry, typeManager, true); return new DeltaLakeCdfPageSink( typeManager.getTypeOperators(), diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java index 913144a0dd7e..a5c8944f95e3 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java @@ -148,11 +148,11 @@ public ConnectorPageSource createPageSource( .collect(toImmutableList()); Map> partitionKeys = split.getPartitionKeys(); - ColumnMappingMode columnMappingMode = getColumnMappingMode(table.getMetadataEntry()); + ColumnMappingMode columnMappingMode = getColumnMappingMode(table.getMetadataEntry(), table.getProtocolEntry()); Optional> partitionValues = Optional.empty(); if (deltaLakeColumns.stream().anyMatch(column -> column.getBaseColumnName().equals(ROW_ID_COLUMN_NAME))) { partitionValues = Optional.of(new ArrayList<>()); - for (DeltaLakeColumnMetadata column : extractSchema(table.getMetadataEntry(), typeManager)) { + for (DeltaLakeColumnMetadata column : extractSchema(table.getMetadataEntry(), table.getProtocolEntry(), typeManager)) { Optional value = switch (columnMappingMode) { case NONE: yield partitionKeys.get(column.getName()); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeParquetSchemas.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeParquetSchemas.java index 0f5d2ee25cf8..b1e290ac6086 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeParquetSchemas.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeParquetSchemas.java @@ -21,6 +21,7 @@ import io.airlift.json.ObjectMapperProvider; import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; +import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import io.trino.spi.Location; import io.trino.spi.TrinoException; import io.trino.spi.type.DecimalType; @@ -86,14 +87,14 @@ public final class DeltaLakeParquetSchemas private DeltaLakeParquetSchemas() {} - public static DeltaLakeParquetSchemaMapping createParquetSchemaMapping(MetadataEntry metadataEntry, TypeManager typeManager) + public static DeltaLakeParquetSchemaMapping createParquetSchemaMapping(MetadataEntry metadataEntry, ProtocolEntry protocolEntry, TypeManager typeManager) { - return createParquetSchemaMapping(metadataEntry, typeManager, false); + return createParquetSchemaMapping(metadataEntry, protocolEntry, typeManager, false); } - public static DeltaLakeParquetSchemaMapping createParquetSchemaMapping(MetadataEntry metadataEntry, TypeManager typeManager, boolean addChangeDataFeedFields) + public static DeltaLakeParquetSchemaMapping createParquetSchemaMapping(MetadataEntry metadataEntry, ProtocolEntry protocolEntry, TypeManager typeManager, boolean addChangeDataFeedFields) { - DeltaLakeSchemaSupport.ColumnMappingMode columnMappingMode = getColumnMappingMode(metadataEntry); + DeltaLakeSchemaSupport.ColumnMappingMode columnMappingMode = getColumnMappingMode(metadataEntry, protocolEntry); return createParquetSchemaMapping( metadataEntry.getSchemaString(), typeManager, diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java index b8d16d606fdd..7350c56a0f77 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java @@ -173,7 +173,7 @@ private Stream getSplits( .map(DeltaLakeColumnHandle.class::cast)) .map(DeltaLakeColumnHandle::getBaseColumnName) .collect(toImmutableSet()); - List schema = extractSchema(tableHandle.getMetadataEntry(), typeManager); + List schema = extractSchema(tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), typeManager); List predicatedColumns = schema.stream() .filter(column -> predicatedColumnNames.contains(column.getName())) .collect(toImmutableList()); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/DeltaTableOptimizeHandle.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/DeltaTableOptimizeHandle.java index 839311885c5b..150d2661750a 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/DeltaTableOptimizeHandle.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/DeltaTableOptimizeHandle.java @@ -19,6 +19,7 @@ import io.airlift.units.DataSize; import io.trino.plugin.deltalake.DeltaLakeColumnHandle; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; +import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import java.util.List; import java.util.Optional; @@ -30,6 +31,7 @@ public class DeltaTableOptimizeHandle extends DeltaTableProcedureHandle { private final MetadataEntry metadataEntry; + private final ProtocolEntry protocolEntry; private final List tableColumns; private final List originalPartitionColumns; private final DataSize maxScannedFileSize; @@ -39,6 +41,7 @@ public class DeltaTableOptimizeHandle @JsonCreator public DeltaTableOptimizeHandle( MetadataEntry metadataEntry, + ProtocolEntry protocolEntry, List tableColumns, List originalPartitionColumns, DataSize maxScannedFileSize, @@ -46,6 +49,7 @@ public DeltaTableOptimizeHandle( boolean retriesEnabled) { this.metadataEntry = requireNonNull(metadataEntry, "metadataEntry is null"); + this.protocolEntry = requireNonNull(protocolEntry, "protocolEntry is null"); this.tableColumns = ImmutableList.copyOf(requireNonNull(tableColumns, "tableColumns is null")); this.originalPartitionColumns = ImmutableList.copyOf(requireNonNull(originalPartitionColumns, "originalPartitionColumns is null")); this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null"); @@ -58,6 +62,7 @@ public DeltaTableOptimizeHandle withCurrentVersion(long currentVersion) checkState(this.currentVersion.isEmpty(), "currentVersion already set"); return new DeltaTableOptimizeHandle( metadataEntry, + protocolEntry, tableColumns, originalPartitionColumns, maxScannedFileSize, @@ -71,6 +76,12 @@ public MetadataEntry getMetadataEntry() return metadataEntry; } + @JsonProperty + public ProtocolEntry getProtocolEntry() + { + return protocolEntry; + } + @JsonProperty public List getTableColumns() { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java index 138afe6ec6f4..241a4e81cb54 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java @@ -14,6 +14,7 @@ package io.trino.plugin.deltalake.procedure; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; import com.google.inject.Provider; import io.airlift.log.Logger; @@ -60,6 +61,7 @@ import static io.trino.plugin.deltalake.DeltaLakeMetadata.MAX_WRITER_VERSION; import static io.trino.plugin.deltalake.DeltaLakeMetadata.checkValidTableHandle; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getVacuumMinRetention; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.unsupportedWriterFeatures; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; @@ -175,11 +177,15 @@ private void doVacuum( accessControl.checkCanDeleteFromTable(null, tableName); TableSnapshot tableSnapshot = transactionLogAccess.loadSnapshot(tableName, handle.getLocation(), session); - // TODO https://github.com/trinodb/trino/issues/15873 Check writer features when supporting writer version 7 ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(session, tableSnapshot); if (protocolEntry.getMinWriterVersion() > MAX_WRITER_VERSION) { throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %d writer version".formatted(protocolEntry.getMinWriterVersion())); } + Set unsupportedWriterFeatures = unsupportedWriterFeatures(protocolEntry.getWriterFeatures().orElse(ImmutableSet.of())); + if (!unsupportedWriterFeatures.isEmpty()) { + throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %s writer features".formatted(unsupportedWriterFeatures)); + } + String tableLocation = tableSnapshot.getTableLocation(); String transactionLogDir = getTransactionLogDir(tableLocation); TrinoFileSystem fileSystem = fileSystemFactory.create(session); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/statistics/FileBasedTableStatisticsProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/statistics/FileBasedTableStatisticsProvider.java index b297a55d1124..ec080238e7ab 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/statistics/FileBasedTableStatisticsProvider.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/statistics/FileBasedTableStatisticsProvider.java @@ -76,7 +76,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, DeltaLakeTab double numRecords = 0L; MetadataEntry metadata = tableHandle.getMetadataEntry(); - List columnMetadata = DeltaLakeSchemaSupport.extractSchema(metadata, typeManager); + List columnMetadata = DeltaLakeSchemaSupport.extractSchema(metadata, tableHandle.getProtocolEntry(), typeManager); List columns = columnMetadata.stream() .map(columnMeta -> new DeltaLakeColumnHandle( columnMeta.getName(), diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java index fe8a154ee18f..195fb3f57af4 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java @@ -54,13 +54,16 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Strings.isNullOrEmpty; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.Streams.stream; +import static com.google.common.primitives.Booleans.countTrue; import static io.trino.plugin.deltalake.DeltaLakeColumnType.PARTITION_KEY; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; +import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.DELTA_CHANGE_DATA_FEED_ENABLED_PROPERTY; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; @@ -95,6 +98,13 @@ private DeltaLakeSchemaSupport() {} .add("timestampNtz") .add("deletionVectors") .build(); + private static final Set SUPPORTED_WRITER_FEATURES = ImmutableSet.builder() + .add("appendOnly") + .add("invariants") + .add("checkConstraints") + .add("changeDataFeed") + .add("columnMapping") + .build(); public enum ColumnMappingMode { @@ -120,18 +130,35 @@ public enum ColumnMappingMode private static final ObjectMapper OBJECT_MAPPER = new ObjectMapperProvider().get(); - public static boolean isAppendOnly(MetadataEntry metadataEntry) + public static boolean isAppendOnly(MetadataEntry metadataEntry, ProtocolEntry protocolEntry) { + if (protocolEntry.supportsWriterFeatures() && !protocolEntry.writerFeaturesContains("appendOnly")) { + return false; + } return parseBoolean(metadataEntry.getConfiguration().getOrDefault(APPEND_ONLY_CONFIGURATION_KEY, "false")); } - public static boolean isDeletionVectorEnabled(MetadataEntry metadataEntry) + public static boolean isDeletionVectorEnabled(MetadataEntry metadataEntry, ProtocolEntry protocolEntry) { + if (protocolEntry.supportsWriterFeatures() && !protocolEntry.writerFeaturesContains("deletionVectors")) { + return false; + } return parseBoolean(metadataEntry.getConfiguration().get(DELETION_VECTORS_CONFIGURATION_KEY)); } - public static ColumnMappingMode getColumnMappingMode(MetadataEntry metadata) + public static ColumnMappingMode getColumnMappingMode(MetadataEntry metadata, ProtocolEntry protocolEntry) { + if (protocolEntry.supportsReaderFeatures() || protocolEntry.supportsWriterFeatures()) { + boolean supportsColumnMappingReader = protocolEntry.readerFeaturesContains("columnMapping"); + boolean supportsColumnMappingWriter = protocolEntry.writerFeaturesContains("columnMapping"); + int columnMappingEnabled = countTrue(supportsColumnMappingReader, supportsColumnMappingWriter); + checkArgument( + columnMappingEnabled == 0 || columnMappingEnabled == 2, + "Both reader and writer features should must the same value for 'columnMapping'. reader: %s, writer: %s", supportsColumnMappingReader, supportsColumnMappingWriter); + if (columnMappingEnabled == 0) { + return ColumnMappingMode.NONE; + } + } String columnMappingMode = metadata.getConfiguration().getOrDefault(COLUMN_MAPPING_MODE_CONFIGURATION_KEY, "none"); return Enums.getIfPresent(ColumnMappingMode.class, columnMappingMode.toUpperCase(ENGLISH)).or(ColumnMappingMode.UNKNOWN); } @@ -143,9 +170,9 @@ public static int getMaxColumnId(MetadataEntry metadata) return Integer.parseInt(maxColumnId); } - public static List extractPartitionColumns(MetadataEntry metadataEntry, TypeManager typeManager) + public static List extractPartitionColumns(MetadataEntry metadataEntry, ProtocolEntry protocolEntry, TypeManager typeManager) { - return extractPartitionColumns(extractSchema(metadataEntry, typeManager), metadataEntry.getOriginalPartitionColumns()); + return extractPartitionColumns(extractSchema(metadataEntry, protocolEntry, typeManager), metadataEntry.getOriginalPartitionColumns()); } public static List extractPartitionColumns(List schema, List originalPartitionColumns) @@ -358,16 +385,16 @@ public static String serializeStatsAsJson(DeltaLakeFileStatistics fileStatistics return OBJECT_MAPPER.writeValueAsString(fileStatistics); } - public static List extractColumnMetadata(MetadataEntry metadataEntry, TypeManager typeManager) + public static List extractColumnMetadata(MetadataEntry metadataEntry, ProtocolEntry protocolEntry, TypeManager typeManager) { - return extractSchema(metadataEntry, typeManager).stream() + return extractSchema(metadataEntry, protocolEntry, typeManager).stream() .map(DeltaLakeColumnMetadata::getColumnMetadata) .collect(toImmutableList()); } - public static List extractSchema(MetadataEntry metadataEntry, TypeManager typeManager) + public static List extractSchema(MetadataEntry metadataEntry, ProtocolEntry protocolEntry, TypeManager typeManager) { - ColumnMappingMode mappingMode = getColumnMappingMode(metadataEntry); + ColumnMappingMode mappingMode = getColumnMappingMode(metadataEntry, protocolEntry); verifySupportedColumnMapping(mappingMode); return Optional.ofNullable(metadataEntry.getSchemaString()) .map(json -> getColumnMetadata(json, typeManager, mappingMode)) @@ -452,8 +479,11 @@ public static Map getColumnsNullability(MetadataEntry metadataE return getColumnProperties(metadataEntry, node -> node.get("nullable").asBoolean()); } - public static Map getColumnIdentities(MetadataEntry metadataEntry) + public static Map getColumnIdentities(MetadataEntry metadataEntry, ProtocolEntry protocolEntry) { + if (protocolEntry.supportsWriterFeatures() && !protocolEntry.writerFeaturesContains("identityColumns")) { + return ImmutableMap.of(); + } return getColumnProperties(metadataEntry, DeltaLakeSchemaSupport::isIdentityColumn); } @@ -463,11 +493,24 @@ private static boolean isIdentityColumn(JsonNode node) .anyMatch(name -> name.startsWith("delta.identity.")); } - public static Map getColumnInvariants(MetadataEntry metadataEntry) + public static Map getColumnInvariants(MetadataEntry metadataEntry, ProtocolEntry protocolEntry) { + if (protocolEntry.supportsWriterFeatures()) { + if (!protocolEntry.writerFeaturesContains("invariants")) { + return ImmutableMap.of(); + } + return getColumnProperties(metadataEntry, DeltaLakeSchemaSupport::getInvariantsWriterFeature); + } return getColumnProperties(metadataEntry, DeltaLakeSchemaSupport::getInvariants); } + @Nullable + private static String getInvariantsWriterFeature(JsonNode node) + { + JsonNode invariants = node.get("metadata").get("delta.invariants"); + return invariants == null ? null : invariants.asText(); + } + @Nullable private static String getInvariants(JsonNode node) { @@ -497,17 +540,26 @@ private static String getGeneratedColumnExpressions(JsonNode node) return generationExpression == null ? null : generationExpression.asText(); } - public static Map getCheckConstraints(MetadataEntry metadataEntry) + public static Map getCheckConstraints(MetadataEntry metadataEntry, ProtocolEntry protocolEntry) { + if (protocolEntry.supportsWriterFeatures() && !protocolEntry.writerFeaturesContains("checkConstraints")) { + return ImmutableMap.of(); + } return metadataEntry.getConfiguration().entrySet().stream() .filter(entry -> entry.getKey().startsWith("delta.constraints.")) .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); } - public static boolean changeDataFeedEnabled(MetadataEntry metadataEntry) + public static Optional changeDataFeedEnabled(MetadataEntry metadataEntry, ProtocolEntry protocolEntry) { - String enableChangeDataFeed = metadataEntry.getConfiguration().getOrDefault("delta.enableChangeDataFeed", "false"); - return parseBoolean(enableChangeDataFeed); + if (protocolEntry.supportsWriterFeatures() && !protocolEntry.writerFeaturesContains("changeDataFeed")) { + return Optional.empty(); + } + String enableChangeDataFeed = metadataEntry.getConfiguration().get(DELTA_CHANGE_DATA_FEED_ENABLED_PROPERTY); + if (enableChangeDataFeed == null) { + return Optional.empty(); + } + return Optional.of(parseBoolean(enableChangeDataFeed)); } public static Map> getColumnsMetadata(MetadataEntry metadataEntry) @@ -555,6 +607,11 @@ public static Set unsupportedReaderFeatures(Set features) return Sets.difference(features, SUPPORTED_READER_FEATURES); } + public static Set unsupportedWriterFeatures(Set features) + { + return Sets.difference(features, SUPPORTED_WRITER_FEATURES); + } + public static Type deserializeType(TypeManager typeManager, Object type, boolean usePhysicalName) { try { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/MetadataEntry.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/MetadataEntry.java index 8dfce9ba0def..6bd31e60a217 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/MetadataEntry.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/MetadataEntry.java @@ -161,22 +161,6 @@ public Optional getCheckpointInterval() } } - @JsonIgnore - public Optional isChangeDataFeedEnabled() - { - if (this.getConfiguration() == null) { - return Optional.empty(); - } - - String value = this.getConfiguration().get(DELTA_CHANGE_DATA_FEED_ENABLED_PROPERTY); - if (value == null) { - return Optional.empty(); - } - - boolean changeDataFeedEnabled = Boolean.parseBoolean(value); - return Optional.of(changeDataFeedEnabled); - } - public static Map configurationForNewTable( Optional checkpointInterval, Optional changeDataFeedEnabled, diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/ProtocolEntry.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/ProtocolEntry.java index 47d1dfb47a2d..7159b22e6b61 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/ProtocolEntry.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/ProtocolEntry.java @@ -76,6 +76,26 @@ public Optional> getWriterFeatures() return writerFeatures; } + public boolean supportsReaderFeatures() + { + return minReaderVersion >= MIN_VERSION_SUPPORTS_READER_FEATURES; + } + + public boolean readerFeaturesContains(String featureName) + { + return readerFeatures.map(features -> features.contains(featureName)).orElse(false); + } + + public boolean supportsWriterFeatures() + { + return minWriterVersion >= MIN_VERSION_SUPPORTS_WRITER_FEATURES; + } + + public boolean writerFeaturesContains(String featureName) + { + return writerFeatures.map(features -> features.contains(featureName)).orElse(false); + } + @Override public boolean equals(Object o) { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java index fcc5219f4eef..61c8197cf7ab 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java @@ -31,6 +31,7 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.Set; @@ -43,6 +44,7 @@ import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.ADD; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.METADATA; +import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.PROTOCOL; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -179,30 +181,32 @@ public Stream getCheckpointTransactionLogEntries( LastCheckpoint checkpoint = lastCheckpoint.get(); // Add entries contain statistics. When struct statistics are used the format of the Parquet file depends on the schema. It is important to use the schema at the time // of the Checkpoint creation, in case the schema has evolved since it was written. - Optional metadataEntry = entryTypes.contains(ADD) ? - Optional.of(getCheckpointMetadataEntry( - session, - checkpointSchemaManager, - typeManager, - fileSystem, - stats, - checkpoint)) : - Optional.empty(); + Optional metadataAndProtocol = Optional.empty(); + if (entryTypes.contains(ADD)) { + metadataAndProtocol = Optional.of(getCheckpointMetadataAndProtocolEntries( + session, + checkpointSchemaManager, + typeManager, + fileSystem, + stats, + checkpoint)); + } Stream resultStream = Stream.empty(); for (Location checkpointPath : getCheckpointPartPaths(checkpoint)) { TrinoInputFile checkpointFile = fileSystem.newInputFile(checkpointPath); resultStream = Stream.concat( resultStream, - getCheckpointTransactionLogEntries( + stream(getCheckpointTransactionLogEntries( session, entryTypes, - metadataEntry, + metadataAndProtocol.map(MetadataAndProtocolEntry::metadataEntry), + metadataAndProtocol.map(MetadataAndProtocolEntry::protocolEntry), checkpointSchemaManager, typeManager, stats, checkpoint, - checkpointFile)); + checkpointFile))); } return resultStream; } @@ -212,10 +216,11 @@ public Optional getLastCheckpointVersion() return lastCheckpoint.map(LastCheckpoint::getVersion); } - private Stream getCheckpointTransactionLogEntries( + private Iterator getCheckpointTransactionLogEntries( ConnectorSession session, Set entryTypes, Optional metadataEntry, + Optional protocolEntry, CheckpointSchemaManager checkpointSchemaManager, TypeManager typeManager, FileFormatDataSourceStats stats, @@ -230,7 +235,7 @@ private Stream getCheckpointTransactionLogEntries( catch (FileNotFoundException e) { throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, format("%s mentions a non-existent checkpoint file for table: %s", checkpoint, table)); } - return stream(new CheckpointEntryIterator( + return new CheckpointEntryIterator( checkpointFile, session, fileSize, @@ -238,13 +243,14 @@ private Stream getCheckpointTransactionLogEntries( typeManager, entryTypes, metadataEntry, + protocolEntry, stats, parquetReaderOptions, checkpointRowStatisticsWritingEnabled, - domainCompactionThreshold)); + domainCompactionThreshold); } - private MetadataEntry getCheckpointMetadataEntry( + private MetadataAndProtocolEntry getCheckpointMetadataAndProtocolEntries( ConnectorSession session, CheckpointSchemaManager checkpointSchemaManager, TypeManager typeManager, @@ -253,23 +259,46 @@ private MetadataEntry getCheckpointMetadataEntry( LastCheckpoint checkpoint) throws IOException { + MetadataEntry metadata = null; + ProtocolEntry protocol = null; for (Location checkpointPath : getCheckpointPartPaths(checkpoint)) { TrinoInputFile checkpointFile = fileSystem.newInputFile(checkpointPath); - Stream metadataEntries = getCheckpointTransactionLogEntries( + Iterator entries = getCheckpointTransactionLogEntries( session, - ImmutableSet.of(METADATA), + ImmutableSet.of(METADATA, PROTOCOL), + Optional.empty(), Optional.empty(), checkpointSchemaManager, typeManager, stats, checkpoint, checkpointFile); - Optional metadataEntry = metadataEntries.findFirst(); - if (metadataEntry.isPresent()) { - return metadataEntry.get().getMetaData(); + while (entries.hasNext()) { + DeltaLakeTransactionLogEntry entry = entries.next(); + if (metadata == null && entry.getMetaData() != null) { + metadata = entry.getMetaData(); + } + if (protocol == null && entry.getProtocol() != null) { + protocol = entry.getProtocol(); + } + if (metadata != null && protocol != null) { + break; + } } } - throw new TrinoException(DELTA_LAKE_BAD_DATA, "Checkpoint found without metadata entry: " + checkpoint); + if (metadata == null || protocol == null) { + throw new TrinoException(DELTA_LAKE_BAD_DATA, "Checkpoint found without metadata and protocol entry: " + checkpoint); + } + return new MetadataAndProtocolEntry(metadata, protocol); + } + + private record MetadataAndProtocolEntry(MetadataEntry metadataEntry, ProtocolEntry protocolEntry) + { + private MetadataAndProtocolEntry + { + requireNonNull(metadataEntry, "metadataEntry is null"); + requireNonNull(protocolEntry, "protocolEntry is null"); + } } private List getCheckpointPartPaths(LastCheckpoint checkpoint) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java index 272e5c513824..0308d6373d70 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java @@ -268,9 +268,9 @@ private List loadActiveFiles(TableSnapshot tableSnapshot, Connecto } } - public static List columnsWithStats(MetadataEntry metadataEntry, TypeManager typeManager) + public static List columnsWithStats(MetadataEntry metadataEntry, ProtocolEntry protocolEntry, TypeManager typeManager) { - return columnsWithStats(DeltaLakeSchemaSupport.extractSchema(metadataEntry, typeManager), metadataEntry.getOriginalPartitionColumns()); + return columnsWithStats(DeltaLakeSchemaSupport.extractSchema(metadataEntry, protocolEntry, typeManager), metadataEntry.getOriginalPartitionColumns()); } public static ImmutableList columnsWithStats(List schema, List partitionColumns) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java index b990b535c1aa..351c608c9fd2 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java @@ -136,6 +136,7 @@ public String getColumnName() private final List extractors; private final boolean checkpointRowStatisticsWritingEnabled; private MetadataEntry metadataEntry; + private ProtocolEntry protocolEntry; private List schema; // Use DeltaLakeColumnMetadata? private Page page; private long pageIndex; @@ -149,6 +150,7 @@ public CheckpointEntryIterator( TypeManager typeManager, Set fields, Optional metadataEntry, + Optional protocolEntry, FileFormatDataSourceStats stats, ParquetReaderOptions parquetReaderOptions, boolean checkpointRowStatisticsWritingEnabled, @@ -172,11 +174,13 @@ public CheckpointEntryIterator( if (fields.contains(ADD)) { checkArgument(metadataEntry.isPresent(), "Metadata entry must be provided when reading ADD entries from Checkpoint files"); this.metadataEntry = metadataEntry.get(); - this.schema = extractSchema(this.metadataEntry, typeManager); + checkArgument(protocolEntry.isPresent(), "Protocol entry must be provided when reading ADD entries from Checkpoint files"); + this.protocolEntry = protocolEntry.get(); + this.schema = extractSchema(this.metadataEntry, this.protocolEntry, typeManager); } List columns = fields.stream() - .map(field -> buildColumnHandle(field, checkpointSchemaManager, this.metadataEntry).toHiveColumnHandle()) + .map(field -> buildColumnHandle(field, checkpointSchemaManager, this.metadataEntry, this.protocolEntry).toHiveColumnHandle()) .collect(toImmutableList()); TupleDomain tupleDomain = columns.size() > 1 ? @@ -205,11 +209,11 @@ public CheckpointEntryIterator( .collect(toImmutableList()); } - private DeltaLakeColumnHandle buildColumnHandle(EntryType entryType, CheckpointSchemaManager schemaManager, MetadataEntry metadataEntry) + private DeltaLakeColumnHandle buildColumnHandle(EntryType entryType, CheckpointSchemaManager schemaManager, MetadataEntry metadataEntry, ProtocolEntry protocolEntry) { Type type = switch (entryType) { case TRANSACTION -> schemaManager.getTxnEntryType(); - case ADD -> schemaManager.getAddEntryType(metadataEntry, true, true); + case ADD -> schemaManager.getAddEntryType(metadataEntry, protocolEntry, true, true); case REMOVE -> schemaManager.getRemoveEntryType(); case METADATA -> schemaManager.getMetadataEntryType(); case PROTOCOL -> schemaManager.getProtocolEntryType(true, true); @@ -397,7 +401,7 @@ private DeltaLakeTransactionLogEntry buildAddEntry(ConnectorSession session, Blo if (block.isNull(pagePosition)) { return null; } - boolean deletionVectorsEnabled = isDeletionVectorEnabled(metadataEntry); + boolean deletionVectorsEnabled = isDeletionVectorEnabled(metadataEntry, protocolEntry); Block addEntryBlock = block.getObject(pagePosition, Block.class); log.debug("Block %s has %s fields", block, addEntryBlock.getPositionCount()); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java index cdea8eca148a..eaea241a3291 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java @@ -17,6 +17,7 @@ import com.google.inject.Inject; import io.trino.plugin.deltalake.DeltaLakeColumnMetadata; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; +import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import io.trino.spi.type.ArrayType; import io.trino.spi.type.BigintType; import io.trino.spi.type.BooleanType; @@ -111,11 +112,11 @@ public RowType getMetadataEntryType() return metadataEntryType; } - public RowType getAddEntryType(MetadataEntry metadataEntry, boolean requireWriteStatsAsJson, boolean requireWriteStatsAsStruct) + public RowType getAddEntryType(MetadataEntry metadataEntry, ProtocolEntry protocolEntry, boolean requireWriteStatsAsJson, boolean requireWriteStatsAsStruct) { - List allColumns = extractSchema(metadataEntry, typeManager); - List minMaxColumns = columnsWithStats(metadataEntry, typeManager); - boolean deletionVectorEnabled = isDeletionVectorEnabled(metadataEntry); + List allColumns = extractSchema(metadataEntry, protocolEntry, typeManager); + List minMaxColumns = columnsWithStats(metadataEntry, protocolEntry, typeManager); + boolean deletionVectorEnabled = isDeletionVectorEnabled(metadataEntry, protocolEntry); ImmutableList.Builder minMaxFields = ImmutableList.builder(); for (DeltaLakeColumnMetadata dataColumn : minMaxColumns) { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java index 2e6a5c819197..168c46a9866f 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java @@ -113,7 +113,7 @@ public void write(CheckpointEntries entries, TrinoOutputFile outputFile) RowType metadataEntryType = checkpointSchemaManager.getMetadataEntryType(); RowType protocolEntryType = checkpointSchemaManager.getProtocolEntryType(protocolEntry.getReaderFeatures().isPresent(), protocolEntry.getWriterFeatures().isPresent()); RowType txnEntryType = checkpointSchemaManager.getTxnEntryType(); - RowType addEntryType = checkpointSchemaManager.getAddEntryType(entries.getMetadataEntry(), writeStatsAsJson, writeStatsAsStruct); + RowType addEntryType = checkpointSchemaManager.getAddEntryType(entries.getMetadataEntry(), entries.getProtocolEntry(), writeStatsAsJson, writeStatsAsStruct); RowType removeEntryType = checkpointSchemaManager.getRemoveEntryType(); List columnNames = ImmutableList.of( @@ -149,7 +149,7 @@ public void write(CheckpointEntries entries, TrinoOutputFile outputFile) writeTransactionEntry(pageBuilder, txnEntryType, transactionEntry); } for (AddFileEntry addFileEntry : entries.getAddFileEntries()) { - writeAddFileEntry(pageBuilder, addEntryType, addFileEntry, entries.getMetadataEntry(), writeStatsAsJson, writeStatsAsStruct); + writeAddFileEntry(pageBuilder, addEntryType, addFileEntry, entries.getMetadataEntry(), entries.getProtocolEntry(), writeStatsAsJson, writeStatsAsStruct); } for (RemoveFileEntry removeFileEntry : entries.getRemoveFileEntries()) { writeRemoveFileEntry(pageBuilder, removeEntryType, removeFileEntry); @@ -222,7 +222,7 @@ private void writeTransactionEntry(PageBuilder pageBuilder, RowType entryType, T appendNullOtherBlocks(pageBuilder, TXN_BLOCK_CHANNEL); } - private void writeAddFileEntry(PageBuilder pageBuilder, RowType entryType, AddFileEntry addFileEntry, MetadataEntry metadataEntry, boolean writeStatsAsJson, boolean writeStatsAsStruct) + private void writeAddFileEntry(PageBuilder pageBuilder, RowType entryType, AddFileEntry addFileEntry, MetadataEntry metadataEntry, ProtocolEntry protocolEntry, boolean writeStatsAsJson, boolean writeStatsAsStruct) { pageBuilder.declarePosition(); RowBlockBuilder blockBuilder = (RowBlockBuilder) pageBuilder.getBlockBuilder(ADD_BLOCK_CHANNEL); @@ -244,7 +244,7 @@ private void writeAddFileEntry(PageBuilder pageBuilder, RowType entryType, AddFi fieldId++; if (writeStatsAsJson) { - writeJsonStats(fieldBuilders.get(fieldId), entryType, addFileEntry, metadataEntry, fieldId); + writeJsonStats(fieldBuilders.get(fieldId), entryType, addFileEntry, metadataEntry, protocolEntry, fieldId); fieldId++; } @@ -260,13 +260,13 @@ private void writeAddFileEntry(PageBuilder pageBuilder, RowType entryType, AddFi appendNullOtherBlocks(pageBuilder, ADD_BLOCK_CHANNEL); } - private void writeJsonStats(BlockBuilder entryBlockBuilder, RowType entryType, AddFileEntry addFileEntry, MetadataEntry metadataEntry, int fieldId) + private void writeJsonStats(BlockBuilder entryBlockBuilder, RowType entryType, AddFileEntry addFileEntry, MetadataEntry metadataEntry, ProtocolEntry protocolEntry, int fieldId) { String statsJson = null; if (addFileEntry.getStats().isPresent()) { DeltaLakeFileStatistics statistics = addFileEntry.getStats().get(); if (statistics instanceof DeltaLakeParquetFileStatistics parquetFileStatistics) { - Map columnTypeMapping = getColumnTypeMapping(metadataEntry); + Map columnTypeMapping = getColumnTypeMapping(metadataEntry, protocolEntry); DeltaLakeJsonFileStatistics jsonFileStatistics = new DeltaLakeJsonFileStatistics( parquetFileStatistics.getNumRecords(), parquetFileStatistics.getMinValues().map(values -> toJsonValues(columnTypeMapping, values)), @@ -281,9 +281,9 @@ private void writeJsonStats(BlockBuilder entryBlockBuilder, RowType entryType, A writeString(entryBlockBuilder, entryType, fieldId, "stats", statsJson); } - private Map getColumnTypeMapping(MetadataEntry deltaMetadata) + private Map getColumnTypeMapping(MetadataEntry deltaMetadata, ProtocolEntry protocolEntry) { - return extractSchema(deltaMetadata, typeManager).stream() + return extractSchema(deltaMetadata, protocolEntry, typeManager).stream() .collect(toImmutableMap(DeltaLakeColumnMetadata::getPhysicalName, DeltaLakeColumnMetadata::getPhysicalColumnType)); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java index a024340de21d..765922201e18 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java @@ -548,7 +548,7 @@ public void testTimestampNtz() """); // TODO https://github.com/trinodb/trino/issues/15873 Support writing timestamp_ntz type when upgrading the max writer version to 7 - assertQueryFails("INSERT INTO timestamp_ntz VALUES NULL", "Table .* requires Delta Lake writer version 7 which is not supported"); + assertQueryFails("INSERT INTO timestamp_ntz VALUES NULL", "\\QUnsupported writer features: [timestampNtz]"); } /** @@ -590,7 +590,7 @@ public void testTimestampNtzPartitioned() // TODO https://github.com/trinodb/trino/issues/15873 Support writing timestamp_ntz type when upgrading the max writer version to 7 assertQueryFails( "INSERT INTO timestamp_ntz_partition VALUES (NULL, NULL)", - "Table .* requires Delta Lake writer version 7 which is not supported"); + "\\QUnsupported writer features: [timestampNtz]"); } /** diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMinioAndHmsConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMinioAndHmsConnectorSmokeTest.java index f875ad325963..6afc567cd846 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMinioAndHmsConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMinioAndHmsConnectorSmokeTest.java @@ -192,6 +192,28 @@ public void testDeltaColumnInvariant() assertQuery("SELECT * FROM " + tableName, "VALUES (1), (2)"); } + /** + * @see databricks122.invariants_writer_feature + */ + @Test + public void testDeltaColumnInvariantWriterFeature() + { + String tableName = "test_invariants_writer_feature_" + randomNameSuffix(); + hiveMinioDataLake.copyResources("databricks122/invariants_writer_feature", tableName); + assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(SCHEMA, tableName, getLocationForTable(bucketName, tableName))); + + assertQuery("SELECT * FROM " + tableName, "VALUES 1"); + assertUpdate("INSERT INTO " + tableName + " VALUES 2", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES 1, 2"); + + assertThatThrownBy(() -> query("INSERT INTO " + tableName + " VALUES 3")) + .hasMessageContaining("Check constraint violation: (\"col_invariants\" < 3)"); + assertThatThrownBy(() -> query("UPDATE " + tableName + " SET col_invariants = 3 WHERE col_invariants = 1")) + .hasMessageContaining("Check constraint violation: (\"col_invariants\" < 3)"); + + assertQuery("SELECT * FROM " + tableName, "VALUES 1, 2"); + } + @Test public void testSchemaEvolutionOnTableWithColumnInvariant() { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java index 4f37d1050bcd..116318be70ce 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java @@ -606,7 +606,7 @@ public void testSnapshotsAreConsistent() } assertEquals(expectedDataFiles.size(), dataFilesWithFixedVersion.size()); - List columns = extractColumnMetadata(transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION), TESTING_TYPE_MANAGER); + List columns = extractColumnMetadata(transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION), transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot), TESTING_TYPE_MANAGER); for (int i = 0; i < expectedDataFiles.size(); i++) { AddFileEntry expected = expectedDataFiles.get(i); AddFileEntry actual = dataFilesWithFixedVersion.get(i); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java index e22568e403df..fc197b3bc8fe 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java @@ -118,7 +118,7 @@ public void testReadProtocolEntries() throws Exception { URI checkpointUri = getResource(TEST_CHECKPOINT).toURI(); - CheckpointEntryIterator checkpointEntryIterator = createCheckpointEntryIterator(checkpointUri, ImmutableSet.of(PROTOCOL), Optional.empty()); + CheckpointEntryIterator checkpointEntryIterator = createCheckpointEntryIterator(checkpointUri, ImmutableSet.of(PROTOCOL), Optional.empty(), Optional.empty()); List entries = ImmutableList.copyOf(checkpointEntryIterator); assertThat(entries).hasSize(1); @@ -136,7 +136,7 @@ public void testReadAddEntries() throws Exception { URI checkpointUri = getResource(TEST_CHECKPOINT).toURI(); - CheckpointEntryIterator checkpointEntryIterator = createCheckpointEntryIterator(checkpointUri, ImmutableSet.of(ADD), Optional.of(readMetadataEntry(checkpointUri))); + CheckpointEntryIterator checkpointEntryIterator = createCheckpointEntryIterator(checkpointUri, ImmutableSet.of(ADD), Optional.of(readMetadataEntry(checkpointUri)), Optional.of(readProtocolEntry(checkpointUri))); List entries = ImmutableList.copyOf(checkpointEntryIterator); assertThat(entries).hasSize(9); @@ -185,7 +185,8 @@ public void testReadAllEntries() CheckpointEntryIterator checkpointEntryIterator = createCheckpointEntryIterator( checkpointUri, ImmutableSet.of(METADATA, PROTOCOL, TRANSACTION, ADD, REMOVE, COMMIT), - Optional.of(readMetadataEntry(checkpointUri))); + Optional.of(readMetadataEntry(checkpointUri)), + Optional.of(readProtocolEntry(checkpointUri))); List entries = ImmutableList.copyOf(checkpointEntryIterator); assertThat(entries).hasSize(17); @@ -308,11 +309,12 @@ public void testSkipRemoveEntries() CheckpointEntryIterator addEntryIterator = createCheckpointEntryIterator( URI.create(targetPath), ImmutableSet.of(ADD), - Optional.of(metadataEntry)); + Optional.of(metadataEntry), + Optional.of(protocolEntry)); CheckpointEntryIterator removeEntryIterator = - createCheckpointEntryIterator(URI.create(targetPath), ImmutableSet.of(REMOVE), Optional.empty()); + createCheckpointEntryIterator(URI.create(targetPath), ImmutableSet.of(REMOVE), Optional.empty(), Optional.empty()); CheckpointEntryIterator txnEntryIterator = - createCheckpointEntryIterator(URI.create(targetPath), ImmutableSet.of(TRANSACTION), Optional.empty()); + createCheckpointEntryIterator(URI.create(targetPath), ImmutableSet.of(TRANSACTION), Optional.empty(), Optional.empty()); assertThat(Iterators.size(addEntryIterator)).isEqualTo(1); assertThat(Iterators.size(removeEntryIterator)).isEqualTo(numRemoveEntries); @@ -326,11 +328,18 @@ public void testSkipRemoveEntries() private MetadataEntry readMetadataEntry(URI checkpointUri) throws IOException { - CheckpointEntryIterator checkpointEntryIterator = createCheckpointEntryIterator(checkpointUri, ImmutableSet.of(METADATA), Optional.empty()); + CheckpointEntryIterator checkpointEntryIterator = createCheckpointEntryIterator(checkpointUri, ImmutableSet.of(METADATA), Optional.empty(), Optional.empty()); return Iterators.getOnlyElement(checkpointEntryIterator).getMetaData(); } - private CheckpointEntryIterator createCheckpointEntryIterator(URI checkpointUri, Set entryTypes, Optional metadataEntry) + private ProtocolEntry readProtocolEntry(URI checkpointUri) + throws IOException + { + CheckpointEntryIterator checkpointEntryIterator = createCheckpointEntryIterator(checkpointUri, ImmutableSet.of(PROTOCOL), Optional.empty(), Optional.empty()); + return Iterators.getOnlyElement(checkpointEntryIterator).getProtocol(); + } + + private CheckpointEntryIterator createCheckpointEntryIterator(URI checkpointUri, Set entryTypes, Optional metadataEntry, Optional protocolEntry) throws IOException { TrinoFileSystem fileSystem = new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS).create(SESSION); @@ -344,6 +353,7 @@ private CheckpointEntryIterator createCheckpointEntryIterator(URI checkpointUri, TESTING_TYPE_MANAGER, entryTypes, metadataEntry, + protocolEntry, new FileFormatDataSourceStats(), new ParquetReaderConfig().toParquetReaderOptions(), true, diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java index 0e7edd7d3d14..21d7215fe50c 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java @@ -202,7 +202,7 @@ public void testCheckpointWriteReadJsonRoundtrip() targetFile.delete(); // file must not exist when writer is called writer.write(entries, createOutputFile(targetPath)); - CheckpointEntries readEntries = readCheckpoint(targetPath, metadataEntry, true); + CheckpointEntries readEntries = readCheckpoint(targetPath, metadataEntry, protocolEntry, true); assertEquals(readEntries.getTransactionEntries(), entries.getTransactionEntries()); assertEquals(readEntries.getRemoveFileEntries(), entries.getRemoveFileEntries()); assertEquals(readEntries.getMetadataEntry(), entries.getMetadataEntry()); @@ -339,7 +339,7 @@ public void testCheckpointWriteReadParquetStatisticsRoundtrip() targetFile.delete(); // file must not exist when writer is called writer.write(entries, createOutputFile(targetPath)); - CheckpointEntries readEntries = readCheckpoint(targetPath, metadataEntry, true); + CheckpointEntries readEntries = readCheckpoint(targetPath, metadataEntry, protocolEntry, true); assertEquals(readEntries.getTransactionEntries(), entries.getTransactionEntries()); assertEquals(readEntries.getRemoveFileEntries(), entries.getRemoveFileEntries()); assertEquals(readEntries.getMetadataEntry(), entries.getMetadataEntry()); @@ -411,7 +411,7 @@ public void testDisablingRowStatistics() targetFile.delete(); // file must not exist when writer is called writer.write(entries, createOutputFile(targetPath)); - CheckpointEntries readEntries = readCheckpoint(targetPath, metadataEntry, false); + CheckpointEntries readEntries = readCheckpoint(targetPath, metadataEntry, protocolEntry, false); AddFileEntry addFileEntry = getOnlyElement(readEntries.getAddFileEntries()); assertThat(addFileEntry.getStats()).isPresent(); @@ -480,7 +480,7 @@ else if (statsValue instanceof Slice slice) { return Optional.of(comparableStats.buildOrThrow()); } - private CheckpointEntries readCheckpoint(String checkpointPath, MetadataEntry metadataEntry, boolean rowStatisticsEnabled) + private CheckpointEntries readCheckpoint(String checkpointPath, MetadataEntry metadataEntry, ProtocolEntry protocolEntry, boolean rowStatisticsEnabled) throws IOException { TrinoFileSystem fileSystem = new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS).create(SESSION); @@ -494,6 +494,7 @@ private CheckpointEntries readCheckpoint(String checkpointPath, MetadataEntry me typeManager, ImmutableSet.of(METADATA, PROTOCOL, TRANSACTION, ADD, REMOVE), Optional.of(metadataEntry), + Optional.of(protocolEntry), new FileFormatDataSourceStats(), new ParquetReaderConfig().toParquetReaderOptions(), rowStatisticsEnabled, diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/statistics/TestDeltaLakeFileStatistics.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/statistics/TestDeltaLakeFileStatistics.java index e7a357aef285..eafcb83e772a 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/statistics/TestDeltaLakeFileStatistics.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/statistics/TestDeltaLakeFileStatistics.java @@ -22,6 +22,7 @@ import io.trino.plugin.deltalake.DeltaLakeConfig; import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; +import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator; import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointSchemaManager; import io.trino.plugin.hive.FileFormatDataSourceStats; @@ -48,6 +49,7 @@ import static io.airlift.slice.Slices.utf8Slice; import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.METADATA; +import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.PROTOCOL; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone; @@ -101,11 +103,26 @@ public void testParseParquetStatistics() typeManager, ImmutableSet.of(METADATA), Optional.empty(), + Optional.empty(), new FileFormatDataSourceStats(), new ParquetReaderConfig().toParquetReaderOptions(), true, new DeltaLakeConfig().getDomainCompactionThreshold()); MetadataEntry metadataEntry = getOnlyElement(metadataEntryIterator).getMetaData(); + CheckpointEntryIterator protocolEntryIterator = new CheckpointEntryIterator( + checkpointFile, + SESSION, + checkpointFile.length(), + checkpointSchemaManager, + typeManager, + ImmutableSet.of(PROTOCOL), + Optional.empty(), + Optional.empty(), + new FileFormatDataSourceStats(), + new ParquetReaderConfig().toParquetReaderOptions(), + true, + new DeltaLakeConfig().getDomainCompactionThreshold()); + ProtocolEntry protocolEntry = getOnlyElement(protocolEntryIterator).getProtocol(); CheckpointEntryIterator checkpointEntryIterator = new CheckpointEntryIterator( checkpointFile, @@ -115,6 +132,7 @@ public void testParseParquetStatistics() typeManager, ImmutableSet.of(CheckpointEntryIterator.EntryType.ADD), Optional.of(metadataEntry), + Optional.of(protocolEntry), new FileFormatDataSourceStats(), new ParquetReaderConfig().toParquetReaderOptions(), true, diff --git a/plugin/trino-delta-lake/src/test/resources/databricks122/invariants_writer_feature/README.md b/plugin/trino-delta-lake/src/test/resources/databricks122/invariants_writer_feature/README.md new file mode 100644 index 000000000000..cdf23861309c --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/databricks122/invariants_writer_feature/README.md @@ -0,0 +1,29 @@ +Data generated using Databricks 12.2: +Using PySpark because Spark SQL doesn't support creating a table with column invariants. + +```py +import pyspark.sql.types +from delta.tables import DeltaTable + +schema = pyspark.sql.types.StructType([ + pyspark.sql.types.StructField( + "col_invariants", + dataType = pyspark.sql.types.IntegerType(), + nullable = False, + metadata = { "delta.invariants": "col_invariants < 3" } + ) +]) + +table = DeltaTable.create(spark) \ + .tableName("test_invariants") \ + .addColumns(schema) \ + .location("s3://trino-ci-test/default/test_invariants") \ + .property("delta.feature.invariants", "supported") \ + .execute() + +spark.createDataFrame([(1,)], schema=schema).write.saveAsTable( + "test_invariants", + mode="append", + format="delta", +) +``` diff --git a/plugin/trino-delta-lake/src/test/resources/databricks122/invariants_writer_feature/_delta_log/00000000000000000000.json b/plugin/trino-delta-lake/src/test/resources/databricks122/invariants_writer_feature/_delta_log/00000000000000000000.json new file mode 100644 index 000000000000..a9db56379758 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/databricks122/invariants_writer_feature/_delta_log/00000000000000000000.json @@ -0,0 +1,3 @@ +{"commitInfo":{"timestamp":1691571164476,"userId":"7853186923043731","userName":"yuya.ebihara@starburstdata.com","operation":"CREATE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{\"delta.feature.invariants\":\"supported\"}"},"notebook":{"notebookId":"2299734316069194"},"clusterId":"0705-101043-4cc9r1rt","isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Databricks-Runtime/12.2.x-scala2.12","txnId":"f394c5c6-6666-4f7d-b550-3ffb8c600a1f"}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":7,"writerFeatures":["invariants"]}} +{"metaData":{"id":"6f41a246-3858-45f0-b796-a0aae91eba36","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"col_invariants\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{\"delta.invariants\":\"col_invariants < 3\"}}]}","partitionColumns":[],"configuration":{},"createdTime":1691571164139}} diff --git a/plugin/trino-delta-lake/src/test/resources/databricks122/invariants_writer_feature/_delta_log/00000000000000000001.json b/plugin/trino-delta-lake/src/test/resources/databricks122/invariants_writer_feature/_delta_log/00000000000000000001.json new file mode 100644 index 000000000000..6a4235511580 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/databricks122/invariants_writer_feature/_delta_log/00000000000000000001.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1691571169547,"userId":"7853186923043731","userName":"yuya.ebihara@starburstdata.com","operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"notebook":{"notebookId":"2299734316069194"},"clusterId":"0705-101043-4cc9r1rt","readVersion":0,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"650"},"engineInfo":"Databricks-Runtime/12.2.x-scala2.12","txnId":"583f8e25-7c75-4d46-9f9c-cc22fc801bd3"}} +{"add":{"path":"part-00001-3e38bdb2-fccc-4bd0-8f60-4ce39aa50ef9-c000.snappy.parquet","partitionValues":{},"size":650,"modificationTime":1691571170000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"col_invariants\":1},\"maxValues\":{\"col_invariants\":1},\"nullCount\":{\"col_invariants\":0}}","tags":{"INSERTION_TIME":"1691571170000000","MIN_INSERTION_TIME":"1691571170000000","MAX_INSERTION_TIME":"1691571170000000","OPTIMIZE_TARGET_SIZE":"268435456"}}} diff --git a/plugin/trino-delta-lake/src/test/resources/databricks122/invariants_writer_feature/part-00001-3e38bdb2-fccc-4bd0-8f60-4ce39aa50ef9-c000.snappy.parquet b/plugin/trino-delta-lake/src/test/resources/databricks122/invariants_writer_feature/part-00001-3e38bdb2-fccc-4bd0-8f60-4ce39aa50ef9-c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..b48ee2ad89b59e24f0c7193bf49775959ad29a66 GIT binary patch literal 650 zcmZWnU2oGc6m>lq5g;U_+H#~wL>5$=)Ot-q*i!9 zRKW~Tj9TkbCM)z=RcdkPwMh=%PhJw?Y=Lq;h2-Y#$s~Fv2oYinaINbmt4eV@P=WG^ z*xy1lSGhtXuIAdPy85CDZ6E~$;(KQy&CN{9k!(yoPmJu#i%jQfBS#+=vV>x_^{-zD z*&NAAqo=yYMmf)Oh;szvExIrN5XjL&raCvE{`$Rjp?mBGa=HApB;wJQ^mh7JVDtpd zS{ZP*nPgDZ?q+n?Q=YGa!D^;@H>G)ASI6ecN&H=Zx?PPw^N22Eih%o&W#< literal 0 HcmV?d00001 diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java index 6535bc877b22..ffdd4e11f605 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java @@ -38,6 +38,7 @@ import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getTableCommentOnDelta; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getTableCommentOnTrino; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getTablePropertyOnDelta; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.skipTestUnlessUnsupportedWriterVersionExists; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; @@ -73,20 +74,22 @@ public void testAddColumnWithCommentOnTrino() @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testAddColumnUnsupportedWriterVersion() { + skipTestUnlessUnsupportedWriterVersionExists(); + String tableName = "test_dl_add_column_unsupported_writer_" + randomNameSuffix(); String tableDirectory = "databricks-compatibility-test-" + tableName; onDelta().executeQuery(format("" + "CREATE TABLE default.%s (col int) " + "USING DELTA LOCATION 's3://%s/%s'" + - "TBLPROPERTIES ('delta.minWriterVersion'='7')", + "TBLPROPERTIES ('delta.minWriterVersion'='8')", tableName, bucketName, tableDirectory)); try { assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " ADD COLUMN new_col int")) - .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported"); + .hasMessageMatching(".* Table .* requires Delta Lake writer version 8 which is not supported"); } finally { dropDeltaTableWithRetry("default." + tableName); @@ -186,20 +189,22 @@ public void testCommentOnTable() @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testCommentOnTableUnsupportedWriterVersion() { + skipTestUnlessUnsupportedWriterVersionExists(); + String tableName = "test_dl_comment_table_unsupported_writer_" + randomNameSuffix(); String tableDirectory = "databricks-compatibility-test-" + tableName; onDelta().executeQuery(format("" + "CREATE TABLE default.%s (col int) " + "USING DELTA LOCATION 's3://%s/%s'" + - "TBLPROPERTIES ('delta.minWriterVersion'='7')", + "TBLPROPERTIES ('delta.minWriterVersion'='8')", tableName, bucketName, tableDirectory)); try { assertQueryFailure(() -> onTrino().executeQuery("COMMENT ON TABLE delta.default." + tableName + " IS 'test comment'")) - .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported"); + .hasMessageMatching(".* Table .* requires Delta Lake writer version 8 which is not supported"); } finally { onTrino().executeQuery("DROP TABLE delta.default." + tableName); @@ -232,20 +237,22 @@ public void testCommentOnColumn() @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testCommentOnColumnUnsupportedWriterVersion() { + skipTestUnlessUnsupportedWriterVersionExists(); + String tableName = "test_dl_comment_column_unsupported_writer_" + randomNameSuffix(); String tableDirectory = "databricks-compatibility-test-" + tableName; onDelta().executeQuery(format("" + "CREATE TABLE default.%s (col int) " + "USING DELTA LOCATION 's3://%s/%s'" + - "TBLPROPERTIES ('delta.minWriterVersion'='7')", + "TBLPROPERTIES ('delta.minWriterVersion'='8')", tableName, bucketName, tableDirectory)); try { assertQueryFailure(() -> onTrino().executeQuery("COMMENT ON COLUMN delta.default." + tableName + ".col IS 'test column comment'")) - .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported"); + .hasMessageMatching(".* Table .* requires Delta Lake writer version 8 which is not supported"); } finally { onTrino().executeQuery("DROP TABLE delta.default." + tableName); @@ -256,20 +263,22 @@ public void testCommentOnColumnUnsupportedWriterVersion() @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testOptimizeUnsupportedWriterVersion() { + skipTestUnlessUnsupportedWriterVersionExists(); + String tableName = "test_dl_optimize_unsupported_writer_" + randomNameSuffix(); String tableDirectory = "databricks-compatibility-test-" + tableName; onDelta().executeQuery(format("" + "CREATE TABLE default.%s (col int) " + "USING DELTA LOCATION 's3://%s/%s'" + - "TBLPROPERTIES ('delta.minWriterVersion'='7')", + "TBLPROPERTIES ('delta.minWriterVersion'='8')", tableName, bucketName, tableDirectory)); try { assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " EXECUTE OPTIMIZE")) - .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported"); + .hasMessageMatching(".* Table .* requires Delta Lake writer version 8 which is not supported"); } finally { dropDeltaTableWithRetry(tableName); @@ -364,6 +373,33 @@ public void testTrinoPreservesReaderAndWriterVersions() } } + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testTrinoPreservesTableFeature() + { + String tableName = "test_trino_preserves_table_feature_" + randomNameSuffix(); + + onDelta().executeQuery("CREATE TABLE default." + tableName + " (col int)" + + "USING DELTA " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" + + "TBLPROPERTIES ('delta.checkpointInterval'=1, 'delta.feature.columnMapping'='supported')"); + try { + onTrino().executeQuery("COMMENT ON COLUMN delta.default." + tableName + ".col IS 'test column comment'"); + onTrino().executeQuery("COMMENT ON TABLE delta.default." + tableName + " IS 'test table comment'"); + onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " ADD COLUMN new_col INT"); + onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 1)"); + onTrino().executeQuery("UPDATE delta.default." + tableName + " SET col = 2"); + onTrino().executeQuery("DELETE FROM delta.default." + tableName); + onTrino().executeQuery("MERGE INTO delta.default." + tableName + " t USING delta.default." + tableName + " s " + "ON (t.col = s.col) WHEN MATCHED THEN UPDATE SET new_col = 3"); + + assertThat(getTablePropertyOnDelta("default", tableName, "delta.feature.columnMapping")) + .isEqualTo("supported"); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + @Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS}) @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testTrinoAlterTablePreservesGeneratedColumn() @@ -400,4 +436,36 @@ public void testTrinoAlterTablePreservesGeneratedColumn() dropDeltaTableWithRetry("default." + tableName); } } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testUnsupportedStatementWithUnsupportedWriterFeature() + { + String tableName = "test_dl_add_column_unsupported_writer_feature_" + randomNameSuffix(); + + onDelta().executeQuery("CREATE TABLE default." + tableName + "" + + "(a int, b int) " + + "USING DELTA " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" + + "TBLPROPERTIES ('delta.feature.generatedColumns'='supported')"); + try { + assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " ADD COLUMN new_col int")) + .hasMessageContaining("Unsupported writer features: [generatedColumns]"); + assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " RENAME COLUMN a TO renamed")) + .hasMessageContaining("Unsupported writer features: [generatedColumns]"); + assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " DROP COLUMN b")) + .hasMessageContaining("Unsupported writer features: [generatedColumns]"); + assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " EXECUTE OPTIMIZE")) + .hasMessageContaining("Unsupported writer features: [generatedColumns]"); + assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " ALTER COLUMN b SET DATA TYPE bigint")) + .hasMessageContaining("This connector does not support setting column types"); + assertQueryFailure(() -> onTrino().executeQuery("COMMENT ON TABLE delta.default." + tableName + " IS 'test comment'")) + .hasMessageContaining("Unsupported writer features: [generatedColumns]"); + assertQueryFailure(() -> onTrino().executeQuery("COMMENT ON COLUMN delta.default." + tableName + ".a IS 'test column comment'")) + .hasMessageContaining("Unsupported writer features: [generatedColumns]"); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeChangeDataFeedCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeChangeDataFeedCompatibility.java index e6b747fa6a95..8598d708b11b 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeChangeDataFeedCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeChangeDataFeedCompatibility.java @@ -30,16 +30,20 @@ import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure; import static io.trino.testing.TestingNames.randomNameSuffix; import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS; +import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_104; +import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_113; import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_91; import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS; import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getTablePropertiesOnDelta; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.entry; public class TestDeltaLakeChangeDataFeedCompatibility extends BaseTestDeltaLakeS3Storage @@ -93,6 +97,67 @@ public void testUpdateTableWithCdf(String columnMappingMode) } } + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testUpdateTableWithChangeDataFeedWriterFeature() + { + String tableName = "test_change_data_feed_writer_feature_" + randomNameSuffix(); + onDelta().executeQuery("CREATE TABLE default." + tableName + + "(col1 STRING, updated_column INT)" + + "USING DELTA " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" + + "TBLPROPERTIES ('delta.enableChangeDataFeed'=true, 'delta.minWriterVersion'=7)"); + try { + assertThat(onTrino().executeQuery("SHOW CREATE TABLE delta.default." + tableName).getOnlyValue().toString()).contains("change_data_feed_enabled = true"); + + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue1', 1), ('testValue2', 2), ('testValue3', 3)"); + onTrino().executeQuery("UPDATE delta.default." + tableName + " SET updated_column = 30 WHERE col1 = 'testValue3'"); + + assertThat(onDelta().executeQuery("SELECT col1, updated_column, _change_type, _commit_version FROM table_changes('default." + tableName + "', 0)")) + .containsOnly( + row("testValue1", 1, "insert", 1L), + row("testValue2", 2, "insert", 1L), + row("testValue3", 3, "insert", 1L), + row("testValue3", 3, "update_preimage", 2L), + row("testValue3", 30, "update_postimage", 2L)); + + // CDF shouldn't be generated when delta.feature.changeDataFeed exists, but delta.enableChangeDataFeed doesn't exist + onDelta().executeQuery("ALTER TABLE default." + tableName + " UNSET TBLPROPERTIES ('delta.enableChangeDataFeed')"); + assertThat(getTablePropertiesOnDelta("default", tableName)) + .contains(entry("delta.feature.changeDataFeed", "supported")) + .doesNotContainKey("delta.enableChangeDataFeed"); + assertThat(onTrino().executeQuery("SHOW CREATE TABLE delta.default." + tableName).getOnlyValue().toString()).doesNotContain("change_data_feed_enabled"); + + onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES ('testValue4', 4)"); + assertQueryFailure(() -> onDelta().executeQuery("SELECT * FROM table_changes('default." + tableName + "', 4)")) + .hasMessageMatching("(?s)(.*Error getting change data for range \\[4 , 4] as change data was not\nrecorded for version \\[4].*)"); + + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue5', 5)"); + assertQueryFailure(() -> onDelta().executeQuery("SELECT * FROM table_changes('default." + tableName + "', 5)")) + .hasMessageMatching("(?s)(.*Error getting change data for range \\[5 , 5] as change data was not\nrecorded for version \\[5].*)"); + + // CDF shouldn't be generated when delta.feature.changeDataFeed exists, but delta.enableChangeDataFeed is disabled + onDelta().executeQuery("ALTER TABLE default." + tableName + " SET TBLPROPERTIES ('delta.feature.changeDataFeed'='supported', 'delta.enableChangeDataFeed'=false)"); + assertThat(getTablePropertiesOnDelta("default", tableName)) + .contains(entry("delta.feature.changeDataFeed", "supported")) + .contains(entry("delta.enableChangeDataFeed", "false")); + assertThat(onTrino().executeQuery("SHOW CREATE TABLE delta.default." + tableName).getOnlyValue().toString()).contains("change_data_feed_enabled = false"); + + onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES ('testValue7', 7)"); + assertQueryFailure(() -> onDelta().executeQuery("SELECT * FROM table_changes('default." + tableName + "', 7)")) + .hasMessageMatching("(?s)(.*Error getting change data for range \\[7 , 7] as change data was not\nrecorded for version \\[7].*)"); + + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES ('testValue8', 8)"); + assertQueryFailure(() -> onDelta().executeQuery("SELECT * FROM table_changes('default." + tableName + "', 8)")) + .hasMessageMatching("(?s)(.*Error getting change data for range \\[8 , 8] as change data was not\nrecorded for version \\[8].*)"); + + // Enabling only delta.enableChangeDataFeed without delta.feature.changeDataFeed property is unsupported + } + finally { + dropDeltaTableWithRetry(tableName); + } + } + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingModeDataProvider") @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testUpdateCdfTableWithNonLowercaseColumn(String columnMappingMode) diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeCheckConstraintCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeCheckConstraintCompatibility.java index ce4d6ab089ff..a9194b2ae1bf 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeCheckConstraintCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeCheckConstraintCompatibility.java @@ -27,10 +27,12 @@ import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getTablePropertiesOnDelta; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.entry; public class TestDeltaLakeCheckConstraintCompatibility extends BaseTestDeltaLakeS3Storage @@ -199,6 +201,45 @@ public void testCheckConstraintMergeCompatibility() } } + @Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}) + public void testCheckConstraintWriterFeature() + { + String tableName = "test_check_constraint_writer_feature_" + randomNameSuffix(); + + onDelta().executeQuery("CREATE TABLE default." + tableName + + "(a INT) " + + "USING DELTA " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" + + "TBLPROPERTIES ('delta.minWriterVersion'='7')"); + onDelta().executeQuery("ALTER TABLE default." + tableName + " ADD CONSTRAINT a_constraint CHECK (a > 1)"); + try { + onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES 2"); + assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES 1")) + .hasMessageContaining("Check constraint violation"); + + // delta.feature.checkConstraints still exists even after unsetting the property + onDelta().executeQuery("ALTER TABLE default." + tableName + " UNSET TBLPROPERTIES ('delta.feature.checkConstraints')"); + assertThat(getTablePropertiesOnDelta("default", tableName)) + .contains(entry("delta.feature.checkConstraints", "supported")) + .contains(entry("delta.constraints.a_constraint", "a > 1")); + + // Remove the constraint directly + onDelta().executeQuery("ALTER TABLE default." + tableName + " UNSET TBLPROPERTIES ('delta.constraints.a_constraint')"); + assertThat(getTablePropertiesOnDelta("default", tableName)) + .contains(entry("delta.feature.checkConstraints", "supported")) + .doesNotContainKey("delta.constraints.a_constraint"); + + // CHECK constraints shouldn't be enforced after the constraint property was removed + onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES 3"); + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES 4"); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .containsOnly(row(2), row(3), row(4)); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}) @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testCheckConstraintUnknownCondition() diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java index c5e55846b226..298d56132864 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java @@ -39,12 +39,15 @@ import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getColumnNamesOnDelta; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getTableCommentOnDelta; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getTableCommentOnTrino; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getTablePropertiesOnDelta; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getTablePropertyOnDelta; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; +import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.entry; import static org.testng.Assert.assertEquals; public class TestDeltaLakeColumnMappingMode @@ -79,6 +82,49 @@ public void testColumnMappingModeNone() } } + @Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}, dataProvider = "supportedColumnMappingForDmlDataProvider") + public void testColumnMappingModeTableFeature(String mode) + { + String tableName = "test_dl_column_mapping_mode_table_feature" + randomNameSuffix(); + + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + " (col INT)" + + "USING delta " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" + + "TBLPROPERTIES ('delta.feature.columnMapping'='supported', 'delta.columnMapping.mode'='" + mode + "')"); + try { + onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES 1"); + assertThat(getTablePropertiesOnDelta("default", tableName)) + .contains(entry("delta.feature.columnMapping", "supported")) + .contains(entry("delta.columnMapping.mode", mode)); + + // delta.feature.columnMapping still exists even after unsetting the property + onDelta().executeQuery("ALTER TABLE default." + tableName + " UNSET TBLPROPERTIES ('delta.feature.columnMapping')"); + assertThat(getTablePropertiesOnDelta("default", tableName)) + .contains(entry("delta.feature.columnMapping", "supported")); + + // Unsetting delta.columnMapping.mode means changing to 'none' column mapping mode + if (mode.equals("none")) { + onDelta().executeQuery("ALTER TABLE default." + tableName + " UNSET TBLPROPERTIES ('delta.columnMapping.mode')"); + assertThat(getTablePropertiesOnDelta("default", tableName)) + .contains(entry("delta.feature.columnMapping", "supported")) + .doesNotContainKey("delta.columnMapping.mode"); + assertThat((String) onTrino().executeQuery("SHOW CREATE TABLE delta.default." + tableName).getOnlyValue()) + .doesNotContain("column_mapping_mode ="); + } + else { + assertQueryFailure(() -> onDelta().executeQuery("ALTER TABLE default." + tableName + " UNSET TBLPROPERTIES ('delta.columnMapping.mode')")) + .hasMessageContaining("Changing column mapping mode from '" + mode + "' to 'none' is not supported"); + assertThat((String) onTrino().executeQuery("SHOW CREATE TABLE delta.default." + tableName).getOnlyValue()) + .contains("column_mapping_mode = '" + mode.toUpperCase(ENGLISH) + "'"); + } + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingDataProvider") @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testTrinoColumnMappingModeReaderAndWriterVersion(String mode) diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java index 2fbe63add42b..b10f2a63ea6d 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksCheckpointsCompatibility.java @@ -35,6 +35,9 @@ import static io.trino.tempto.assertions.QueryAssert.Row.row; import static io.trino.testing.TestingNames.randomNameSuffix; import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS; +import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_104; +import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_113; +import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_91; import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.deltalake.TransactionLogAssertions.assertLastEntryIsCheckpointed; import static io.trino.tests.product.deltalake.TransactionLogAssertions.assertTransactionLogVersion; @@ -132,6 +135,18 @@ public void testDatabricksCanReadTrinoCheckpoint() @Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS}) @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testTrinoUsesCheckpointInterval() + { + trinoUsesCheckpointInterval("'delta.checkpointInterval' = '5'"); + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testTrinoUsesCheckpointIntervalWithTableFeature() + { + trinoUsesCheckpointInterval("'delta.checkpointInterval' = '5', 'delta.feature.columnMapping'='supported'"); + } + + private void trinoUsesCheckpointInterval(String deltaTableProperties) { String tableName = "test_dl_checkpoints_compat_" + randomNameSuffix(); String tableDirectory = "databricks-compatibility-test-" + tableName; @@ -142,8 +157,8 @@ public void testTrinoUsesCheckpointInterval() " USING delta" + " PARTITIONED BY (a_NuMbEr)" + " LOCATION 's3://%s/%s'" + - " TBLPROPERTIES ('delta.checkpointInterval' = '5')", - tableName, bucketName, tableDirectory)); + " TBLPROPERTIES (%s)", + tableName, bucketName, tableDirectory, deltaTableProperties)); try { // validate that we can see the checkpoint interval diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java index bceb68a685b9..da388f1fd42c 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java @@ -40,10 +40,12 @@ import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getDatabricksRuntimeVersion; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getTablePropertiesOnDelta; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.entry; public class TestDeltaLakeDeleteCompatibility extends BaseTestDeltaLakeS3Storage @@ -117,6 +119,51 @@ public void testDeleteOnAppendOnlyTableFails() onTrino().executeQuery("DROP TABLE " + tableName); } + @Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}) + public void testDeleteOnAppendOnlyWriterFeature() + { + String tableName = "test_delete_on_append_only_feature_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(a INT, b INT)" + + "USING delta " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" + + "TBLPROPERTIES ('delta.minWriterVersion'='7', 'delta.appendOnly' = true)"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1,11), (2, 12)"); + assertThat(getTablePropertiesOnDelta("default", tableName)) + .contains(entry("delta.feature.appendOnly", "supported")) + .contains(entry("delta.appendOnly", "true")); + + assertQueryFailure(() -> onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 1")) + .hasMessageContaining("This table is configured to only allow appends"); + assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM delta.default." + tableName + " WHERE a = 1")) + .hasMessageContaining("Cannot modify rows from a table with 'delta.appendOnly' set to true"); + + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) + .containsOnly(row(1, 11), row(2, 12)); + + // delta.feature.appendOnly still exists even after unsetting the property + onDelta().executeQuery("ALTER TABLE default." + tableName + " UNSET TBLPROPERTIES ('delta.feature.appendOnly')"); + assertThat(getTablePropertiesOnDelta("default", tableName)) + .contains(entry("delta.feature.appendOnly", "supported")) + .contains(entry("delta.appendOnly", "true")); + + // Disable delta.appendOnly property + onDelta().executeQuery("ALTER TABLE default." + tableName + " SET TBLPROPERTIES ('delta.appendOnly'=false)"); + assertThat(getTablePropertiesOnDelta("default", tableName)) + .contains(entry("delta.feature.appendOnly", "supported")) + .contains(entry("delta.appendOnly", "false")); + + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 1"); + onTrino().executeQuery("DELETE FROM delta.default." + tableName + " WHERE a = 2"); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).hasNoRows(); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + // OSS Delta doesn't support TRUNCATE TABLE statement @Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS}) @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) @@ -198,14 +245,14 @@ public void testDeletionVectors(String mode) // TODO https://github.com/trinodb/trino/issues/17063 Use Delta Deletion Vectors for row-level deletes assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (3, 33)")) - .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported"); + .hasMessageContaining("Unsupported writer features: [deletionVectors]"); assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM delta.default." + tableName)) - .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported"); + .hasMessageContaining("Unsupported writer features: [deletionVectors]"); assertQueryFailure(() -> onTrino().executeQuery("UPDATE delta.default." + tableName + " SET a = 3")) - .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported"); + .hasMessageContaining("Unsupported writer features: [deletionVectors]"); assertQueryFailure(() -> onTrino().executeQuery("MERGE INTO delta.default." + tableName + " t USING delta.default." + tableName + " s " + "ON (t.a = s.a) WHEN MATCHED THEN UPDATE SET b = -1")) - .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported"); + .hasMessageContaining("Unsupported writer features: [deletionVectors]"); } finally { dropDeltaTableWithRetry("default." + tableName); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeIdentityColumnCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeIdentityColumnCompatibility.java index 8d896615cb4a..d1faff48ab43 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeIdentityColumnCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeIdentityColumnCompatibility.java @@ -93,14 +93,14 @@ public void testIdentityColumnTableFeature() "TBLPROPERTIES ('delta.feature.identityColumns'='supported')"); try { assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 1)")) - .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported"); + .hasMessageMatching(".* Writing to tables with identity columns is not supported"); assertQueryFailure(() -> onTrino().executeQuery("UPDATE delta.default." + tableName + " SET data = 1")) - .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported"); + .hasMessageMatching(".* Writing to tables with identity columns is not supported"); assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM delta.default." + tableName)) - .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported"); + .hasMessageMatching(".* Writing to tables with identity columns is not supported"); assertQueryFailure(() -> onTrino().executeQuery("MERGE INTO delta.default." + tableName + " t USING delta.default." + tableName + " s " + "ON (t.data = s.data) WHEN MATCHED THEN UPDATE SET data = 1")) - .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported"); + .hasMessageMatching(".* Writing to tables with identity columns is not supported"); } finally { dropDeltaTableWithRetry("default." + tableName); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeWriteDatabricksCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeWriteDatabricksCompatibility.java index f7b346886a39..3d56259823e1 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeWriteDatabricksCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeWriteDatabricksCompatibility.java @@ -43,6 +43,7 @@ import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getDatabricksRuntimeVersion; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.skipTestUnlessUnsupportedWriterVersionExists; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; @@ -395,6 +396,8 @@ private void testVacuumRemoveChangeDataFeedFiles(Consumer vacuumExecutor @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testVacuumUnsupportedWriterVersion() { + skipTestUnlessUnsupportedWriterVersionExists(); + String tableName = "test_vacuum_unsupported_writer_version_" + randomNameSuffix(); String directoryName = "databricks-compatibility-test-" + tableName; @@ -402,10 +405,31 @@ public void testVacuumUnsupportedWriterVersion() "(a INT)" + "USING DELTA " + "LOCATION '" + ("s3://" + bucketName + "/" + directoryName) + "'" + - "TBLPROPERTIES ('delta.minWriterVersion'='7')"); + "TBLPROPERTIES ('delta.minWriterVersion'='8')"); + + try { + assertThatThrownBy(() -> onTrino().executeQuery("CALL delta.system.vacuum('default', '" + tableName + "', '7d')")) + .hasMessageContaining("Cannot execute vacuum procedure with 8 writer version"); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + @Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}) + public void testVacuumUnsupportedWriterFeature() + { + String tableName = "test_vacuum_unsupported_writer_feature_" + randomNameSuffix(); + String directoryName = "databricks-compatibility-test-" + tableName; + + onDelta().executeQuery("CREATE TABLE default." + tableName + + "(a INT)" + + "USING DELTA " + + "LOCATION '" + ("s3://" + bucketName + "/" + directoryName) + "'" + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); try { assertThatThrownBy(() -> onTrino().executeQuery("CALL delta.system.vacuum('default', '" + tableName + "', '7d')")) - .hasMessageContaining("Cannot execute vacuum procedure with 7 writer version"); + .hasMessageContaining("Cannot execute vacuum procedure with [deletionVectors] writer features"); } finally { dropDeltaTableWithRetry("default." + tableName); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java index 4d84b362acf7..c3d9eb1ceec1 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java @@ -22,19 +22,25 @@ import dev.failsafe.Failsafe; import dev.failsafe.RetryPolicy; import io.airlift.log.Logger; +import io.trino.tempto.query.QueryExecutionException; import io.trino.tempto.query.QueryResult; import org.intellij.lang.annotations.Language; +import org.testng.SkipException; import java.time.temporal.ChronoUnit; import java.util.List; +import java.util.Map; import java.util.Optional; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.collect.MoreCollectors.onlyElement; +import static io.trino.testing.TestingNames.randomNameSuffix; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThat; public final class DeltaLakeTestUtils { @@ -64,6 +70,21 @@ public static Optional getDatabricksRuntimeVersion() return Optional.of(DatabricksVersion.parse(version)); } + public static void skipTestUnlessUnsupportedWriterVersionExists() + { + // TODO: This method should be called only once per environment. Consider using a cache or creating a new module like HiveVersionProvider. + String tableName = "test_dl_unsupported_writer_version_" + randomNameSuffix(); + + try { + onDelta().executeQuery("CREATE TABLE default." + tableName + "(col int) USING DELTA TBLPROPERTIES ('delta.minWriterVersion'='8')"); + dropDeltaTableWithRetry("default." + tableName); + } + catch (QueryExecutionException e) { + assertThat(e).hasMessageMatching("(?s).* delta.minWriterVersion needs to be (an integer between \\[1, 7]|one of 1, 2, 3, 4, 5(, 6)?, 7).*"); + throw new SkipException("Cannot test unsupported writer version"); + } + } + public static List getColumnNamesOnDelta(String schemaName, String tableName) { QueryResult result = onDelta().executeQuery("SHOW COLUMNS IN " + schemaName + "." + tableName); @@ -98,6 +119,14 @@ public static String getTableCommentOnDelta(String schemaName, String tableName) .collect(onlyElement()); } + public static Map getTablePropertiesOnDelta(String schemaName, String tableName) + { + QueryResult result = onDelta().executeQuery("SHOW TBLPROPERTIES %s.%s".formatted(schemaName, tableName)); + return result.rows().stream() + .map(column -> Map.entry((String) column.get(0), (String) column.get(1))) + .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); + } + public static String getTablePropertyOnDelta(String schemaName, String tableName, String propertyName) { QueryResult result = onDelta().executeQuery("SHOW TBLPROPERTIES %s.%s(%s)".formatted(schemaName, tableName, propertyName));