From 106f809fb3eda0448a60590d763b89fe8bd75394 Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Tue, 14 Jan 2025 15:52:37 -0800 Subject: [PATCH] Load CDK: avro schema matches legacy CDK --- .../load/data/avro/AirbyteTypeToAvroSchema.kt | 16 +++++- .../kotlin/AirbyteTypeToAvroSchemaTest.kt | 51 +++++++++++++++++++ 2 files changed, 65 insertions(+), 2 deletions(-) diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteTypeToAvroSchema.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteTypeToAvroSchema.kt index a129d3c5cf66..6775c014cbe5 100644 --- a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteTypeToAvroSchema.kt +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteTypeToAvroSchema.kt @@ -24,6 +24,7 @@ import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone import io.airbyte.cdk.load.data.Transformations import io.airbyte.cdk.load.data.UnionType import io.airbyte.cdk.load.data.UnknownType +import io.airbyte.cdk.load.message.Meta import org.apache.avro.LogicalTypes import org.apache.avro.Schema import org.apache.avro.SchemaBuilder @@ -40,8 +41,19 @@ class AirbyteTypeToAvroSchema { SchemaBuilder.record(recordName).namespace(namespaceMangled).fields() airbyteSchema.properties.entries .fold(builder) { acc, (name, field) -> - val converted = convert(field.type, path + name) - val propertySchema = maybeMakeNullable(field, converted) + val propertySchema = + when (name) { + Meta.COLUMN_NAME_AB_EXTRACTED_AT -> + LogicalTypes.timestampMillis() + .addToSchema(Schema.create(Schema.Type.LONG)) + Meta.COLUMN_NAME_AB_RAW_ID -> + LogicalTypes.uuid() + .addToSchema(Schema.create(Schema.Type.STRING)) + else -> { + val converted = convert(field.type, path + name) + maybeMakeNullable(field, converted) + } + } val nameMangled = Transformations.toAvroSafeName(name) acc.name(nameMangled).type(propertySchema).let { if (field.nullable) { diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/test/kotlin/AirbyteTypeToAvroSchemaTest.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/test/kotlin/AirbyteTypeToAvroSchemaTest.kt index 703121d58dfd..51bc7870630a 100644 --- a/airbyte-cdk/bulk/toolkits/load-avro/src/test/kotlin/AirbyteTypeToAvroSchemaTest.kt +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/test/kotlin/AirbyteTypeToAvroSchemaTest.kt @@ -7,6 +7,11 @@ import io.airbyte.cdk.load.data.FieldType import io.airbyte.cdk.load.data.ObjectType import io.airbyte.cdk.load.data.StringType import io.airbyte.cdk.load.data.avro.toAvroSchema +import io.airbyte.cdk.load.data.withAirbyteMeta +import io.airbyte.cdk.load.message.Meta +import org.apache.avro.LogicalTypes +import org.apache.avro.Schema +import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow @@ -23,4 +28,50 @@ class AirbyteTypeToAvroSchemaTest { val descriptor = DestinationStream.Descriptor("test", "stream") assertDoesNotThrow { schema.toAvroSchema(descriptor) } } + + @Test + fun `test airbyte meta schema`() { + val schema = ObjectType(linkedMapOf()).withAirbyteMeta() + val descriptor = DestinationStream.Descriptor("test", "stream") + val avroSchema = schema.toAvroSchema(descriptor) + + assertEquals( + avroSchema.getField(Meta.COLUMN_NAME_AB_RAW_ID).schema().type, + Schema.Type.STRING + ) + assertEquals( + avroSchema.getField(Meta.COLUMN_NAME_AB_RAW_ID).schema().logicalType, + LogicalTypes.uuid() + ) + assertEquals( + avroSchema.getField(Meta.COLUMN_NAME_AB_EXTRACTED_AT).schema().type, + Schema.Type.LONG + ) + assertEquals( + avroSchema.getField(Meta.COLUMN_NAME_AB_EXTRACTED_AT).schema().logicalType, + LogicalTypes.timestampMillis() + ) + assertEquals( + avroSchema.getField(Meta.COLUMN_NAME_AB_GENERATION_ID).schema().type, + Schema.Type.LONG + ) + + val metaSchema = avroSchema.getField(Meta.COLUMN_NAME_AB_META).schema() + assertEquals(metaSchema.type, Schema.Type.RECORD) + assertEquals(metaSchema.fields.size, 2) + + // Meta field 1 + val changesSchema = metaSchema.getField("changes").schema() + assertEquals(changesSchema.type, Schema.Type.ARRAY) + + val changeSchema = changesSchema.elementType + assertEquals(changeSchema.type, Schema.Type.RECORD) + assertEquals(changeSchema.fields.size, 3) + assertEquals(changeSchema.getField("field").schema().type, Schema.Type.STRING) + assertEquals(changeSchema.getField("change").schema().type, Schema.Type.STRING) + assertEquals(changeSchema.getField("reason").schema().type, Schema.Type.STRING) + + // Meta field 2 + assertEquals(metaSchema.getField("sync_id").schema().type, Schema.Type.LONG) + } }