Skip to content

Commit

Permalink
Add support for writer version 7 in Delta Lake
Browse files Browse the repository at this point in the history
Support the following writer features:
* appendOnly
* invariants
* checkConstraints
* changeDataFeed
* columnMapping
  • Loading branch information
ebyhr committed Aug 15, 2023
1 parent cac5b0f commit 47e20fc
Show file tree
Hide file tree
Showing 38 changed files with 747 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.SchemaTableName;

Expand All @@ -30,6 +31,7 @@ public class DeltaLakeInsertTableHandle
private final SchemaTableName tableName;
private final String location;
private final MetadataEntry metadataEntry;
private final ProtocolEntry protocolEntry;
private final List<DeltaLakeColumnHandle> inputColumns;
private final long readVersion;
private final boolean retriesEnabled;
Expand All @@ -39,12 +41,14 @@ public DeltaLakeInsertTableHandle(
@JsonProperty("tableName") SchemaTableName tableName,
@JsonProperty("location") String location,
@JsonProperty("metadataEntry") MetadataEntry metadataEntry,
@JsonProperty("protocolEntry") ProtocolEntry protocolEntry,
@JsonProperty("inputColumns") List<DeltaLakeColumnHandle> inputColumns,
@JsonProperty("readVersion") long readVersion,
@JsonProperty("retriesEnabled") boolean retriesEnabled)
{
this.tableName = requireNonNull(tableName, "tableName is null");
this.metadataEntry = requireNonNull(metadataEntry, "metadataEntry is null");
this.protocolEntry = requireNonNull(protocolEntry, "protocolEntry is null");
this.inputColumns = ImmutableList.copyOf(inputColumns);
this.location = requireNonNull(location, "location is null");
this.readVersion = readVersion;
Expand All @@ -69,6 +73,12 @@ public MetadataEntry getMetadataEntry()
return metadataEntry;
}

@JsonProperty
public ProtocolEntry getProtocolEntry()
{
return protocolEntry;
}

@JsonProperty
public List<DeltaLakeColumnHandle> getInputColumns()
{
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
{
DeltaLakeInsertTableHandle tableHandle = (DeltaLakeInsertTableHandle) insertTableHandle;
MetadataEntry metadataEntry = tableHandle.getMetadataEntry();
DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(metadataEntry, typeManager);
DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(metadataEntry, tableHandle.getProtocolEntry(), typeManager);
return new DeltaLakePageSink(
typeManager.getTypeOperators(),
tableHandle.getInputColumns(),
Expand All @@ -142,7 +142,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
switch (executeHandle.getProcedureId()) {
case OPTIMIZE:
DeltaTableOptimizeHandle optimizeHandle = (DeltaTableOptimizeHandle) executeHandle.getProcedureHandle();
DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(optimizeHandle.getMetadataEntry(), typeManager);
DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(optimizeHandle.getMetadataEntry(), optimizeHandle.getProtocolEntry(), typeManager);
return new DeltaLakePageSink(
typeManager.getTypeOperators(),
optimizeHandle.getTableColumns(),
Expand All @@ -167,7 +167,7 @@ public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transaction
DeltaLakeMergeTableHandle merge = (DeltaLakeMergeTableHandle) mergeHandle;
DeltaLakeInsertTableHandle tableHandle = merge.getInsertTableHandle();
ConnectorPageSink pageSink = createPageSink(transactionHandle, session, tableHandle, pageSinkId);
DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(tableHandle.getMetadataEntry(), typeManager);
DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), typeManager);

return new DeltaLakeMergeSink(
typeManager.getTypeOperators(),
Expand All @@ -183,7 +183,7 @@ public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transaction
tableHandle.getInputColumns(),
domainCompactionThreshold,
() -> createCdfPageSink(merge, session),
changeDataFeedEnabled(tableHandle.getMetadataEntry()),
changeDataFeedEnabled(tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry()).orElse(false),
parquetSchemaMapping);
}

Expand All @@ -193,7 +193,7 @@ private DeltaLakeCdfPageSink createCdfPageSink(
{
MetadataEntry metadataEntry = mergeTableHandle.getTableHandle().getMetadataEntry();
Set<String> partitionKeys = mergeTableHandle.getTableHandle().getMetadataEntry().getOriginalPartitionColumns().stream().collect(toImmutableSet());
List<DeltaLakeColumnHandle> tableColumns = extractSchema(metadataEntry, typeManager).stream()
List<DeltaLakeColumnHandle> tableColumns = extractSchema(metadataEntry, mergeTableHandle.getTableHandle().getProtocolEntry(), typeManager).stream()
.map(metadata -> new DeltaLakeColumnHandle(
metadata.getName(),
metadata.getType(),
Expand All @@ -216,7 +216,7 @@ private DeltaLakeCdfPageSink createCdfPageSink(
.build();
Location tableLocation = Location.of(mergeTableHandle.getTableHandle().getLocation());

DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(metadataEntry, typeManager, true);
DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(metadataEntry, mergeTableHandle.getTableHandle().getProtocolEntry(), typeManager, true);

return new DeltaLakeCdfPageSink(
typeManager.getTypeOperators(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public ConnectorPageSource createPageSource(
Optional<List<String>> partitionValues = Optional.empty();
if (deltaLakeColumns.stream().anyMatch(column -> column.getBaseColumnName().equals(ROW_ID_COLUMN_NAME))) {
partitionValues = Optional.of(new ArrayList<>());
for (DeltaLakeColumnMetadata column : extractSchema(table.getMetadataEntry(), typeManager)) {
for (DeltaLakeColumnMetadata column : extractSchema(table.getMetadataEntry(), table.getProtocolEntry(), typeManager)) {
Optional<String> value = partitionKeys.get(column.getName());
if (value != null) {
partitionValues.get().add(value.orElse(null));
Expand Down Expand Up @@ -183,7 +183,7 @@ public ConnectorPageSource createPageSource(
.withMaxReadBlockRowCount(getParquetMaxReadBlockRowCount(session))
.withUseColumnIndex(isParquetUseColumnIndex(session));

ColumnMappingMode columnMappingMode = getColumnMappingMode(table.getMetadataEntry());
ColumnMappingMode columnMappingMode = getColumnMappingMode(table.getMetadataEntry(), table.getProtocolEntry());
Map<Integer, String> parquetFieldIdToName = columnMappingMode == ColumnMappingMode.ID ? loadParquetIdAndNameMapping(inputFile, options) : ImmutableMap.of();

ImmutableSet.Builder<String> missingColumnNames = ImmutableSet.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.airlift.json.ObjectMapperProvider;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.spi.Location;
import io.trino.spi.TrinoException;
import io.trino.spi.type.DecimalType;
Expand Down Expand Up @@ -86,14 +87,14 @@ public final class DeltaLakeParquetSchemas

private DeltaLakeParquetSchemas() {}

public static DeltaLakeParquetSchemaMapping createParquetSchemaMapping(MetadataEntry metadataEntry, TypeManager typeManager)
public static DeltaLakeParquetSchemaMapping createParquetSchemaMapping(MetadataEntry metadataEntry, ProtocolEntry protocolEntry, TypeManager typeManager)
{
return createParquetSchemaMapping(metadataEntry, typeManager, false);
return createParquetSchemaMapping(metadataEntry, protocolEntry, typeManager, false);
}

public static DeltaLakeParquetSchemaMapping createParquetSchemaMapping(MetadataEntry metadataEntry, TypeManager typeManager, boolean addChangeDataFeedFields)
public static DeltaLakeParquetSchemaMapping createParquetSchemaMapping(MetadataEntry metadataEntry, ProtocolEntry protocolEntry, TypeManager typeManager, boolean addChangeDataFeedFields)
{
DeltaLakeSchemaSupport.ColumnMappingMode columnMappingMode = getColumnMappingMode(metadataEntry);
DeltaLakeSchemaSupport.ColumnMappingMode columnMappingMode = getColumnMappingMode(metadataEntry, protocolEntry);
return createParquetSchemaMapping(
metadataEntry.getSchemaString(),
typeManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ private Stream<DeltaLakeSplit> getSplits(
.map(DeltaLakeColumnHandle.class::cast))
.map(DeltaLakeColumnHandle::getBaseColumnName)
.collect(toImmutableSet());
List<DeltaLakeColumnMetadata> schema = extractSchema(tableHandle.getMetadataEntry(), typeManager);
List<DeltaLakeColumnMetadata> schema = extractSchema(tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), typeManager);
List<DeltaLakeColumnMetadata> predicatedColumns = schema.stream()
.filter(column -> predicatedColumnNames.contains(column.getName()))
.collect(toImmutableList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableSet;
import io.airlift.units.DataSize;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.predicate.TupleDomain;

Expand Down Expand Up @@ -46,6 +47,7 @@ public enum WriteType
private final boolean managed;
private final String location;
private final MetadataEntry metadataEntry;
private final ProtocolEntry protocolEntry;
private final TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint;
private final TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint;
private final Optional<WriteType> writeType;
Expand Down Expand Up @@ -73,6 +75,7 @@ public DeltaLakeTableHandle(
@JsonProperty("managed") boolean managed,
@JsonProperty("location") String location,
@JsonProperty("metadataEntry") MetadataEntry metadataEntry,
@JsonProperty("protocolEntry") ProtocolEntry protocolEntry,
@JsonProperty("enforcedPartitionConstraint") TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint,
@JsonProperty("nonPartitionConstraint") TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
@JsonProperty("writeType") Optional<WriteType> writeType,
Expand All @@ -88,6 +91,7 @@ public DeltaLakeTableHandle(
managed,
location,
metadataEntry,
protocolEntry,
enforcedPartitionConstraint,
nonPartitionConstraint,
ImmutableSet.of(),
Expand All @@ -107,6 +111,7 @@ public DeltaLakeTableHandle(
boolean managed,
String location,
MetadataEntry metadataEntry,
ProtocolEntry protocolEntry,
TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint,
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
Set<DeltaLakeColumnHandle> constraintColumns,
Expand All @@ -124,6 +129,7 @@ public DeltaLakeTableHandle(
this.managed = managed;
this.location = requireNonNull(location, "location is null");
this.metadataEntry = requireNonNull(metadataEntry, "metadataEntry is null");
this.protocolEntry = requireNonNull(protocolEntry, "protocolEntry is null");
this.enforcedPartitionConstraint = requireNonNull(enforcedPartitionConstraint, "enforcedPartitionConstraint is null");
this.nonPartitionConstraint = requireNonNull(nonPartitionConstraint, "nonPartitionConstraint is null");
this.writeType = requireNonNull(writeType, "writeType is null");
Expand All @@ -147,6 +153,7 @@ public DeltaLakeTableHandle withProjectedColumns(Set<DeltaLakeColumnHandle> proj
managed,
location,
metadataEntry,
protocolEntry,
enforcedPartitionConstraint,
nonPartitionConstraint,
constraintColumns,
Expand All @@ -168,6 +175,7 @@ public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize max
managed,
location,
metadataEntry,
protocolEntry,
enforcedPartitionConstraint,
nonPartitionConstraint,
constraintColumns,
Expand Down Expand Up @@ -234,6 +242,12 @@ public MetadataEntry getMetadataEntry()
return metadataEntry;
}

@JsonProperty
public ProtocolEntry getProtocolEntry()
{
return protocolEntry;
}

@JsonProperty
public TupleDomain<DeltaLakeColumnHandle> getEnforcedPartitionConstraint()
{
Expand Down Expand Up @@ -324,6 +338,7 @@ public boolean equals(Object o)
managed == that.managed &&
Objects.equals(location, that.location) &&
Objects.equals(metadataEntry, that.metadataEntry) &&
Objects.equals(protocolEntry, that.protocolEntry) &&
Objects.equals(enforcedPartitionConstraint, that.enforcedPartitionConstraint) &&
Objects.equals(nonPartitionConstraint, that.nonPartitionConstraint) &&
Objects.equals(writeType, that.writeType) &&
Expand All @@ -344,6 +359,7 @@ public int hashCode()
managed,
location,
metadataEntry,
protocolEntry,
enforcedPartitionConstraint,
nonPartitionConstraint,
writeType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airlift.units.DataSize;
import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;

import java.util.List;
import java.util.Optional;
Expand All @@ -30,6 +31,7 @@ public class DeltaTableOptimizeHandle
extends DeltaTableProcedureHandle
{
private final MetadataEntry metadataEntry;
private final ProtocolEntry protocolEntry;
private final List<DeltaLakeColumnHandle> tableColumns;
private final List<String> originalPartitionColumns;
private final DataSize maxScannedFileSize;
Expand All @@ -39,13 +41,15 @@ public class DeltaTableOptimizeHandle
@JsonCreator
public DeltaTableOptimizeHandle(
MetadataEntry metadataEntry,
ProtocolEntry protocolEntry,
List<DeltaLakeColumnHandle> tableColumns,
List<String> originalPartitionColumns,
DataSize maxScannedFileSize,
Optional<Long> currentVersion,
boolean retriesEnabled)
{
this.metadataEntry = requireNonNull(metadataEntry, "metadataEntry is null");
this.protocolEntry = requireNonNull(protocolEntry, "protocolEntry is null");
this.tableColumns = ImmutableList.copyOf(requireNonNull(tableColumns, "tableColumns is null"));
this.originalPartitionColumns = ImmutableList.copyOf(requireNonNull(originalPartitionColumns, "originalPartitionColumns is null"));
this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null");
Expand All @@ -58,6 +62,7 @@ public DeltaTableOptimizeHandle withCurrentVersion(long currentVersion)
checkState(this.currentVersion.isEmpty(), "currentVersion already set");
return new DeltaTableOptimizeHandle(
metadataEntry,
protocolEntry,
tableColumns,
originalPartitionColumns,
maxScannedFileSize,
Expand All @@ -71,6 +76,12 @@ public MetadataEntry getMetadataEntry()
return metadataEntry;
}

@JsonProperty
public ProtocolEntry getProtocolEntry()
{
return protocolEntry;
}

@JsonProperty
public List<DeltaLakeColumnHandle> getTableColumns()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.deltalake.procedure;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import com.google.inject.Provider;
import io.airlift.log.Logger;
Expand Down Expand Up @@ -60,6 +61,7 @@
import static io.trino.plugin.deltalake.DeltaLakeMetadata.MAX_WRITER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.checkValidTableHandle;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getVacuumMinRetention;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.unsupportedWriterFeatures;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
Expand Down Expand Up @@ -175,11 +177,15 @@ private void doVacuum(
accessControl.checkCanDeleteFromTable(null, tableName);

TableSnapshot tableSnapshot = transactionLogAccess.loadSnapshot(tableName, handle.getLocation(), session);
// TODO https://github.com/trinodb/trino/issues/15873 Check writer features when supporting writer version 7
ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(session, tableSnapshot);
if (protocolEntry.getMinWriterVersion() > MAX_WRITER_VERSION) {
throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %d writer version".formatted(protocolEntry.getMinWriterVersion()));
}
Set<String> unsupportedWriterFeatures = unsupportedWriterFeatures(protocolEntry.getWriterFeatures().orElse(ImmutableSet.of()));
if (!unsupportedWriterFeatures.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %s writer features".formatted(unsupportedWriterFeatures));
}

String tableLocation = tableSnapshot.getTableLocation();
String transactionLogDir = getTransactionLogDir(tableLocation);
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, DeltaLakeTab
double numRecords = 0L;

MetadataEntry metadata = tableHandle.getMetadataEntry();
List<DeltaLakeColumnMetadata> columnMetadata = DeltaLakeSchemaSupport.extractSchema(metadata, typeManager);
List<DeltaLakeColumnMetadata> columnMetadata = DeltaLakeSchemaSupport.extractSchema(metadata, tableHandle.getProtocolEntry(), typeManager);
List<DeltaLakeColumnHandle> columns = columnMetadata.stream()
.map(columnMeta -> new DeltaLakeColumnHandle(
columnMeta.getName(),
Expand Down
Loading

0 comments on commit 47e20fc

Please sign in to comment.