Skip to content

Commit

Permalink
Add support for writer version 7 in Delta Lake
Browse files Browse the repository at this point in the history
Support the following writer features:
* appendOnly
* invariants
* checkConstraints
* changeDataFeed
* columnMapping
  • Loading branch information
ebyhr committed Sep 14, 2023
1 parent e35c20b commit 6f15559
Show file tree
Hide file tree
Showing 36 changed files with 738 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.SchemaTableName;

Expand All @@ -30,6 +31,7 @@ public class DeltaLakeInsertTableHandle
private final SchemaTableName tableName;
private final String location;
private final MetadataEntry metadataEntry;
private final ProtocolEntry protocolEntry;
private final List<DeltaLakeColumnHandle> inputColumns;
private final long readVersion;
private final boolean retriesEnabled;
Expand All @@ -39,12 +41,14 @@ public DeltaLakeInsertTableHandle(
@JsonProperty("tableName") SchemaTableName tableName,
@JsonProperty("location") String location,
@JsonProperty("metadataEntry") MetadataEntry metadataEntry,
@JsonProperty("protocolEntry") ProtocolEntry protocolEntry,
@JsonProperty("inputColumns") List<DeltaLakeColumnHandle> inputColumns,
@JsonProperty("readVersion") long readVersion,
@JsonProperty("retriesEnabled") boolean retriesEnabled)
{
this.tableName = requireNonNull(tableName, "tableName is null");
this.metadataEntry = requireNonNull(metadataEntry, "metadataEntry is null");
this.protocolEntry = requireNonNull(protocolEntry, "protocolEntry is null");
this.inputColumns = ImmutableList.copyOf(inputColumns);
this.location = requireNonNull(location, "location is null");
this.readVersion = readVersion;
Expand All @@ -69,6 +73,12 @@ public MetadataEntry getMetadataEntry()
return metadataEntry;
}

@JsonProperty
public ProtocolEntry getProtocolEntry()
{
return protocolEntry;
}

@JsonProperty
public List<DeltaLakeColumnHandle> getInputColumns()
{
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.plugin.deltalake.procedure.DeltaLakeTableExecuteHandle;
import io.trino.plugin.deltalake.procedure.DeltaTableOptimizeHandle;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.hive.NodeVersion;
import io.trino.spi.PageIndexerFactory;
import io.trino.spi.connector.ConnectorInsertTableHandle;
Expand Down Expand Up @@ -119,7 +120,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
{
DeltaLakeInsertTableHandle tableHandle = (DeltaLakeInsertTableHandle) insertTableHandle;
MetadataEntry metadataEntry = tableHandle.getMetadataEntry();
DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(metadataEntry, typeManager);
DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(metadataEntry, tableHandle.getProtocolEntry(), typeManager);
return new DeltaLakePageSink(
typeManager.getTypeOperators(),
tableHandle.getInputColumns(),
Expand All @@ -142,7 +143,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
switch (executeHandle.getProcedureId()) {
case OPTIMIZE:
DeltaTableOptimizeHandle optimizeHandle = (DeltaTableOptimizeHandle) executeHandle.getProcedureHandle();
DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(optimizeHandle.getMetadataEntry(), typeManager);
DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(optimizeHandle.getMetadataEntry(), optimizeHandle.getProtocolEntry(), typeManager);
return new DeltaLakePageSink(
typeManager.getTypeOperators(),
optimizeHandle.getTableColumns(),
Expand All @@ -167,7 +168,7 @@ public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transaction
DeltaLakeMergeTableHandle merge = (DeltaLakeMergeTableHandle) mergeHandle;
DeltaLakeInsertTableHandle tableHandle = merge.getInsertTableHandle();
ConnectorPageSink pageSink = createPageSink(transactionHandle, session, tableHandle, pageSinkId);
DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(tableHandle.getMetadataEntry(), typeManager);
DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), typeManager);

return new DeltaLakeMergeSink(
typeManager.getTypeOperators(),
Expand All @@ -183,7 +184,7 @@ public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transaction
tableHandle.getInputColumns(),
domainCompactionThreshold,
() -> createCdfPageSink(merge, session),
changeDataFeedEnabled(tableHandle.getMetadataEntry()),
changeDataFeedEnabled(tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry()).orElse(false),
parquetSchemaMapping);
}

Expand All @@ -192,8 +193,9 @@ private DeltaLakeCdfPageSink createCdfPageSink(
ConnectorSession session)
{
MetadataEntry metadataEntry = mergeTableHandle.getTableHandle().getMetadataEntry();
ProtocolEntry protocolEntry = mergeTableHandle.getTableHandle().getProtocolEntry();
Set<String> partitionKeys = mergeTableHandle.getTableHandle().getMetadataEntry().getOriginalPartitionColumns().stream().collect(toImmutableSet());
List<DeltaLakeColumnHandle> tableColumns = extractSchema(metadataEntry, typeManager).stream()
List<DeltaLakeColumnHandle> tableColumns = extractSchema(metadataEntry, protocolEntry, typeManager).stream()
.map(metadata -> new DeltaLakeColumnHandle(
metadata.getName(),
metadata.getType(),
Expand All @@ -216,7 +218,7 @@ private DeltaLakeCdfPageSink createCdfPageSink(
.build();
Location tableLocation = Location.of(mergeTableHandle.getTableHandle().getLocation());

DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(metadataEntry, typeManager, true);
DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(metadataEntry, protocolEntry, typeManager, true);

return new DeltaLakeCdfPageSink(
typeManager.getTypeOperators(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,11 @@ public ConnectorPageSource createPageSource(
.collect(toImmutableList());

Map<String, Optional<String>> partitionKeys = split.getPartitionKeys();
ColumnMappingMode columnMappingMode = getColumnMappingMode(table.getMetadataEntry());
ColumnMappingMode columnMappingMode = getColumnMappingMode(table.getMetadataEntry(), table.getProtocolEntry());
Optional<List<String>> partitionValues = Optional.empty();
if (deltaLakeColumns.stream().anyMatch(column -> column.getBaseColumnName().equals(ROW_ID_COLUMN_NAME))) {
partitionValues = Optional.of(new ArrayList<>());
for (DeltaLakeColumnMetadata column : extractSchema(table.getMetadataEntry(), typeManager)) {
for (DeltaLakeColumnMetadata column : extractSchema(table.getMetadataEntry(), table.getProtocolEntry(), typeManager)) {
Optional<String> value = switch (columnMappingMode) {
case NONE:
yield partitionKeys.get(column.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.airlift.json.ObjectMapperProvider;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.spi.Location;
import io.trino.spi.TrinoException;
import io.trino.spi.type.DecimalType;
Expand Down Expand Up @@ -86,14 +87,14 @@ public final class DeltaLakeParquetSchemas

private DeltaLakeParquetSchemas() {}

public static DeltaLakeParquetSchemaMapping createParquetSchemaMapping(MetadataEntry metadataEntry, TypeManager typeManager)
public static DeltaLakeParquetSchemaMapping createParquetSchemaMapping(MetadataEntry metadataEntry, ProtocolEntry protocolEntry, TypeManager typeManager)
{
return createParquetSchemaMapping(metadataEntry, typeManager, false);
return createParquetSchemaMapping(metadataEntry, protocolEntry, typeManager, false);
}

public static DeltaLakeParquetSchemaMapping createParquetSchemaMapping(MetadataEntry metadataEntry, TypeManager typeManager, boolean addChangeDataFeedFields)
public static DeltaLakeParquetSchemaMapping createParquetSchemaMapping(MetadataEntry metadataEntry, ProtocolEntry protocolEntry, TypeManager typeManager, boolean addChangeDataFeedFields)
{
DeltaLakeSchemaSupport.ColumnMappingMode columnMappingMode = getColumnMappingMode(metadataEntry);
DeltaLakeSchemaSupport.ColumnMappingMode columnMappingMode = getColumnMappingMode(metadataEntry, protocolEntry);
return createParquetSchemaMapping(
metadataEntry.getSchemaString(),
typeManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ private Stream<DeltaLakeSplit> getSplits(
.map(DeltaLakeColumnHandle.class::cast))
.map(DeltaLakeColumnHandle::getBaseColumnName)
.collect(toImmutableSet());
List<DeltaLakeColumnMetadata> schema = extractSchema(tableHandle.getMetadataEntry(), typeManager);
List<DeltaLakeColumnMetadata> schema = extractSchema(tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), typeManager);
List<DeltaLakeColumnMetadata> predicatedColumns = schema.stream()
.filter(column -> predicatedColumnNames.contains(column.getName()))
.collect(toImmutableList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airlift.units.DataSize;
import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;

import java.util.List;
import java.util.Optional;
Expand All @@ -30,6 +31,7 @@ public class DeltaTableOptimizeHandle
extends DeltaTableProcedureHandle
{
private final MetadataEntry metadataEntry;
private final ProtocolEntry protocolEntry;
private final List<DeltaLakeColumnHandle> tableColumns;
private final List<String> originalPartitionColumns;
private final DataSize maxScannedFileSize;
Expand All @@ -39,13 +41,15 @@ public class DeltaTableOptimizeHandle
@JsonCreator
public DeltaTableOptimizeHandle(
MetadataEntry metadataEntry,
ProtocolEntry protocolEntry,
List<DeltaLakeColumnHandle> tableColumns,
List<String> originalPartitionColumns,
DataSize maxScannedFileSize,
Optional<Long> currentVersion,
boolean retriesEnabled)
{
this.metadataEntry = requireNonNull(metadataEntry, "metadataEntry is null");
this.protocolEntry = requireNonNull(protocolEntry, "protocolEntry is null");
this.tableColumns = ImmutableList.copyOf(requireNonNull(tableColumns, "tableColumns is null"));
this.originalPartitionColumns = ImmutableList.copyOf(requireNonNull(originalPartitionColumns, "originalPartitionColumns is null"));
this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null");
Expand All @@ -58,6 +62,7 @@ public DeltaTableOptimizeHandle withCurrentVersion(long currentVersion)
checkState(this.currentVersion.isEmpty(), "currentVersion already set");
return new DeltaTableOptimizeHandle(
metadataEntry,
protocolEntry,
tableColumns,
originalPartitionColumns,
maxScannedFileSize,
Expand All @@ -71,6 +76,12 @@ public MetadataEntry getMetadataEntry()
return metadataEntry;
}

@JsonProperty
public ProtocolEntry getProtocolEntry()
{
return protocolEntry;
}

@JsonProperty
public List<DeltaLakeColumnHandle> getTableColumns()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.deltalake.procedure;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import com.google.inject.Provider;
import io.airlift.log.Logger;
Expand Down Expand Up @@ -60,6 +61,7 @@
import static io.trino.plugin.deltalake.DeltaLakeMetadata.MAX_WRITER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.checkValidTableHandle;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getVacuumMinRetention;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.unsupportedWriterFeatures;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
Expand Down Expand Up @@ -175,11 +177,15 @@ private void doVacuum(
accessControl.checkCanDeleteFromTable(null, tableName);

TableSnapshot tableSnapshot = transactionLogAccess.loadSnapshot(tableName, handle.getLocation(), session);
// TODO https://github.com/trinodb/trino/issues/15873 Check writer features when supporting writer version 7
ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(session, tableSnapshot);
if (protocolEntry.getMinWriterVersion() > MAX_WRITER_VERSION) {
throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %d writer version".formatted(protocolEntry.getMinWriterVersion()));
}
Set<String> unsupportedWriterFeatures = unsupportedWriterFeatures(protocolEntry.getWriterFeatures().orElse(ImmutableSet.of()));
if (!unsupportedWriterFeatures.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %s writer features".formatted(unsupportedWriterFeatures));
}

String tableLocation = tableSnapshot.getTableLocation();
String transactionLogDir = getTransactionLogDir(tableLocation);
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, DeltaLakeTab
double numRecords = 0L;

MetadataEntry metadata = tableHandle.getMetadataEntry();
List<DeltaLakeColumnMetadata> columnMetadata = DeltaLakeSchemaSupport.extractSchema(metadata, typeManager);
List<DeltaLakeColumnMetadata> columnMetadata = DeltaLakeSchemaSupport.extractSchema(metadata, tableHandle.getProtocolEntry(), typeManager);
List<DeltaLakeColumnHandle> columns = columnMetadata.stream()
.map(columnMeta -> new DeltaLakeColumnHandle(
columnMeta.getName(),
Expand Down
Loading

0 comments on commit 6f15559

Please sign in to comment.