Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(datahub-client): improve Avro schema conversions #12141

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public class AvroSchemaConverter implements SchemaConverter<Schema> {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final Map<String, Supplier<SchemaFieldDataType.Type>> LOGICAL_TYPE_MAPPING;
public static final String ARRAY_ITEMS_FIELD_NAME = "items";
public static final String MAP_VALUE_FIELD_NAME = "value";

static {
Map<String, Supplier<SchemaFieldDataType.Type>> logicalTypeMap = new HashMap<>();
Expand Down Expand Up @@ -333,41 +335,38 @@ private void processArrayField(
Schema elementSchema = arraySchema.getElementType();
String elementType = getDiscriminatedType(elementSchema);

log.debug("Array Field Path before expand: {}", fieldPath.asString());
fieldPath = fieldPath.expandType("array", arraySchema);
log.debug("Array Field Path after expand: {}", fieldPath.asString());

// Set parent type for proper array handling
DataHubType arrayDataHubType = new DataHubType(ArrayType.class, elementType);

SchemaField arrayField =
new SchemaField()
.setFieldPath(fieldPath.asString())
.setType(arrayDataHubType.asSchemaFieldType())
.setNativeDataType("array(" + elementType + ")")
.setNullable(isNullable || defaultNullable)
.setIsPartOfKey(fieldPath.isKeySchema());

populateCommonProperties(field, arrayField);

fields.add(arrayField);

// Process element type if it's complex
if (elementSchema.getType() == Schema.Type.RECORD
|| elementSchema.getType() == Schema.Type.ARRAY
|| elementSchema.getType() == Schema.Type.MAP
|| elementSchema.getType() == Schema.Type.UNION) {
log.debug("Array Field Path before expand: {}", fieldPath.asString());
fieldPath = fieldPath.popLast();
fieldPath =
fieldPath.clonePlus(
new FieldElement(Collections.singletonList("array"), new ArrayList<>(), null, null));
Schema.Field elementField =
new Schema.Field(
field.name(),
ARRAY_ITEMS_FIELD_NAME,
elementSchema,
elementSchema.getDoc() != null ? elementSchema.getDoc() : field.doc(),
null // TODO: What is the default value for an array element?
);
processField(elementField, fieldPath, defaultNullable, fields, isNullable, arrayDataHubType);
} else {

SchemaField arrayField =
new SchemaField()
.setFieldPath(fieldPath.asString())
.setType(arrayDataHubType.asSchemaFieldType())
.setNativeDataType("array(" + elementType + ")")
.setNullable(isNullable || defaultNullable)
.setIsPartOfKey(fieldPath.isKeySchema());

populateCommonProperties(field, arrayField);
log.debug("Array field path: {} with doc: {}", fieldPath.asString(), field.doc());
fields.add(arrayField);
processField(elementField, fieldPath, defaultNullable, fields, true);
}
}

Expand All @@ -384,7 +383,20 @@ private void processMapField(
String valueType = getDiscriminatedType(valueSchema);

DataHubType mapDataHubType = new DataHubType(MapType.class, valueType);
log.debug("Map Field Path before expand: {}", fieldPath.asString());
fieldPath = fieldPath.expandType("map", mapSchema);
log.debug("Map Field Path fater expand: {}", fieldPath.asString());

SchemaField mapField =
new SchemaField()
.setFieldPath(fieldPath.asString())
.setType(mapDataHubType.asSchemaFieldType())
.setNativeDataType("map<string," + valueType + ">")
.setNullable(isNullable || defaultNullable)
.setIsPartOfKey(fieldPath.isKeySchema());

populateCommonProperties(field, mapField);
fields.add(mapField);

// Process value type if it's complex
if (valueSchema.getType() == Schema.Type.RECORD
Expand All @@ -393,29 +405,12 @@ private void processMapField(
|| valueSchema.getType() == Schema.Type.UNION) {
Schema.Field valueField =
new Schema.Field(
field.name(),
MAP_VALUE_FIELD_NAME,
valueSchema,
valueSchema.getDoc() != null ? valueSchema.getDoc() : field.doc(),
null // TODO: What is the default value for a map value?
); // Nullability for map values follows the nullability of the map itself
FieldPath valueFieldPath =
fieldPath
.popLast()
.clonePlus(
new FieldElement(
Collections.singletonList("map"), new ArrayList<>(), null, null));
processField(valueField, valueFieldPath, defaultNullable, fields, isNullable, mapDataHubType);
} else {
SchemaField mapField =
new SchemaField()
.setFieldPath(fieldPath.asString())
.setType(mapDataHubType.asSchemaFieldType())
.setNativeDataType("map<string," + valueType + ">")
.setNullable(isNullable || defaultNullable)
.setIsPartOfKey(fieldPath.isKeySchema());

populateCommonProperties(field, mapField);
fields.add(mapField);
processField(valueField, fieldPath, defaultNullable, fields, true);
}
}

Expand Down Expand Up @@ -452,6 +447,7 @@ private void processUnionField(

// Otherwise, process as a true union type
DataHubType unionDataHubType = new DataHubType(UnionType.class, discriminatedType);
log.debug("Union Field Path before expand: {}", fieldPath.asString());
FieldPath unionFieldPath = fieldPath.expandType("union", field.schema().toString());
log.debug("Union Field Path after expand: {}", unionFieldPath.asString());

Expand All @@ -475,18 +471,13 @@ private void processUnionField(
int typeIndex = 0;
for (Schema unionSchema : unionTypes) {
if (unionSchema.getType() != Schema.Type.NULL) {
log.debug("TypeIndex: {}, Field path : {}", typeIndex, fieldPath.asString());
String unionFieldName = field.name();
FieldPath indexedFieldPath = fieldPath.popLast();
indexedFieldPath =
indexedFieldPath.clonePlus(
new FieldElement(
Collections.singletonList("union"), new ArrayList<>(), null, null));
log.debug("TypeIndex: {}, Indexed Field path : {}", typeIndex, indexedFieldPath.asString());
// FieldPath unionFieldPath =
// fieldPath.expandType(getDiscriminatedType(unionSchema),
// unionSchema.toString());
log.debug("TypeIndex: {}, Union Field path : {}", typeIndex, unionFieldPath.asString());
String unionFieldName = field.name();

Schema.Field unionFieldInner =
new Schema.Field(
unionFieldName,
Expand Down Expand Up @@ -580,6 +571,13 @@ private String getDiscriminatedType(Schema schema) {
} else {
return schema.getFullName();
}
} else if (schema.getType() == Schema.Type.UNION
&& schema.getTypes().size() == 2
&& schema.getTypes().stream().anyMatch(s -> s.getType() == Schema.Type.NULL)) {
// If this is a union with null, we want to use the non-null type for the discriminated type
Schema nonNullSchema =
schema.getTypes().stream().filter(s -> s.getType() != Schema.Type.NULL).findFirst().get();
return nonNullSchema.getFullName();
}
return schema.getType().getName().toLowerCase();
}
Expand Down
Loading
Loading