Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delta code cleanup #18879

Merged
merged 3 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public static ColumnMappingMode getColumnMappingMode(MetadataEntry metadata)
public static int getMaxColumnId(MetadataEntry metadata)
{
String maxColumnId = metadata.getConfiguration().get(MAX_COLUMN_ID_CONFIGURATION_KEY);
requireNonNull(maxColumnId, () -> MAX_COLUMN_ID_CONFIGURATION_KEY + " metadata configuration property not found");
requireNonNull(maxColumnId, MAX_COLUMN_ID_CONFIGURATION_KEY + " metadata configuration property not found");
return Integer.parseInt(maxColumnId);
}

Expand Down Expand Up @@ -569,53 +569,36 @@ private static Type buildType(TypeManager typeManager, JsonNode typeNode, boolea
if (primitiveType.startsWith("decimal")) {
return typeManager.fromSqlType(primitiveType);
}
switch (primitiveType) {
case "string":
return VARCHAR;
case "long":
return BIGINT;
case "integer":
return INTEGER;
case "short":
return SMALLINT;
case "byte":
return TINYINT;
case "float":
return REAL;
case "double":
return DOUBLE;
case "boolean":
return BOOLEAN;
case "binary":
return VARBINARY;
case "date":
return DATE;
case "timestamp_ntz":
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#timestamp-without-timezone-timestampntz
return TIMESTAMP_MICROS;
case "timestamp":
// Spark/DeltaLake stores timestamps in UTC, but renders them in session time zone.
// For more info, see https://delta-users.slack.com/archives/GKTUWT03T/p1585760533005400
// and https://cwiki.apache.org/confluence/display/Hive/Different+TIMESTAMP+types
return TIMESTAMP_TZ_MILLIS;
default:
throw new TypeNotFoundException(new TypeSignature(primitiveType));
}
return switch (primitiveType) {
case "string" -> VARCHAR;
case "long" -> BIGINT;
case "integer" -> INTEGER;
case "short" -> SMALLINT;
case "byte" -> TINYINT;
case "float" -> REAL;
case "double" -> DOUBLE;
case "boolean" -> BOOLEAN;
case "binary" -> VARBINARY;
case "date" -> DATE;
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#timestamp-without-timezone-timestampntz
case "timestamp_ntz" -> TIMESTAMP_MICROS;
// Spark/DeltaLake stores timestamps in UTC, but renders them in session time zone.
// For more info, see https://delta-users.slack.com/archives/GKTUWT03T/p1585760533005400
// and https://cwiki.apache.org/confluence/display/Hive/Different+TIMESTAMP+types
case "timestamp" -> TIMESTAMP_TZ_MILLIS;
default -> throw new TypeNotFoundException(new TypeSignature(primitiveType));
};
}

private static Type buildContainerType(TypeManager typeManager, JsonNode typeNode, boolean usePhysicalName)
{
String containerType = typeNode.get("type").asText();
switch (containerType) {
case "array":
return buildArrayType(typeManager, typeNode, usePhysicalName);
case "map":
return buildMapType(typeManager, typeNode, usePhysicalName);
case "struct":
return buildRowType(typeManager, typeNode, usePhysicalName);
default:
throw new TypeNotFoundException(new TypeSignature(containerType));
}
return switch (containerType) {
case "array" -> buildArrayType(typeManager, typeNode, usePhysicalName);
case "map" -> buildMapType(typeManager, typeNode, usePhysicalName);
case "struct" -> buildRowType(typeManager, typeNode, usePhysicalName);
default -> throw new TypeNotFoundException(new TypeSignature(containerType));
};
}

private static RowType buildRowType(TypeManager typeManager, JsonNode typeNode, boolean usePhysicalName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ private static List<DeltaLakeTransactionLogEntry> getJsonEntries(long startVersi
return TransactionLogTail.loadNewTail(fileSystem, tableSnapshot.getTableLocation(), Optional.of(startVersion), Optional.of(endVersion)).getFileEntries();
}

public static <T> String canonicalizeColumnName(String columnName)
public static String canonicalizeColumnName(String columnName)
{
return columnName.toLowerCase(Locale.ENGLISH);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,29 +205,14 @@ public CheckpointEntryIterator(

private DeltaLakeColumnHandle buildColumnHandle(EntryType entryType, CheckpointSchemaManager schemaManager, MetadataEntry metadataEntry)
{
Type type;
switch (entryType) {
case TRANSACTION:
type = schemaManager.getTxnEntryType();
break;
case ADD:
type = schemaManager.getAddEntryType(metadataEntry, true, true);
break;
case REMOVE:
type = schemaManager.getRemoveEntryType();
break;
case METADATA:
type = schemaManager.getMetadataEntryType();
break;
case PROTOCOL:
type = schemaManager.getProtocolEntryType(true, true);
break;
case COMMIT:
type = schemaManager.getCommitInfoEntryType();
break;
default:
throw new IllegalArgumentException("Unsupported Delta Lake checkpoint entry type: " + entryType);
}
Type type = switch (entryType) {
case TRANSACTION -> schemaManager.getTxnEntryType();
case ADD -> schemaManager.getAddEntryType(metadataEntry, true, true);
case REMOVE -> schemaManager.getRemoveEntryType();
case METADATA -> schemaManager.getMetadataEntryType();
case PROTOCOL -> schemaManager.getProtocolEntryType(true, true);
case COMMIT -> schemaManager.getCommitInfoEntryType();
};
return new DeltaLakeColumnHandle(entryType.getColumnName(), type, OptionalInt.empty(), entryType.getColumnName(), type, REGULAR, Optional.empty());
}

Expand All @@ -243,26 +228,23 @@ private TupleDomain<HiveColumnHandle> buildTupleDomainColumnHandle(EntryType ent
String field;
Type type;
switch (entryType) {
case COMMIT:
case TRANSACTION:
case COMMIT, TRANSACTION -> {
field = "version";
type = BIGINT;
break;
case ADD:
case REMOVE:
}
case ADD, REMOVE -> {
field = "path";
type = VARCHAR;
break;
case METADATA:
}
case METADATA -> {
field = "id";
type = VARCHAR;
break;
case PROTOCOL:
}
case PROTOCOL -> {
field = "minReaderVersion";
type = BIGINT;
break;
default:
throw new IllegalArgumentException("Unsupported Delta Lake checkpoint entry type: " + entryType);
}
default -> throw new IllegalArgumentException("Unsupported Delta Lake checkpoint entry type: " + entryType);
}
HiveColumnHandle handle = new HiveColumnHandle(
column.getBaseColumnName(),
Expand Down