Skip to content

Commit

Permalink
Bulk Load CDK: Identity AirbyteValue Nulling/Capturing Mapper (#47197)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Oct 23, 2024
1 parent 017d7cc commit 15ff3ee
Show file tree
Hide file tree
Showing 10 changed files with 308 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
Instant.ofEpochMilli(System.currentTimeMillis()),
stream.generationId,
it.data as ObjectValue,
OutputRecord.Meta(changes = it.meta?.changes, syncId = stream.syncId),
OutputRecord.Meta(
changes = it.meta?.changes ?: mutableListOf(),
syncId = stream.syncId
),
)
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data

import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Change
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason

open class AirbyteValueIdentityMapper(
open val meta: DestinationRecord.Meta,
) {
private fun collectFailure(
path: List<String>,
reason: Reason = Reason.DESTINATION_SERIALIZATION_ERROR
) {
meta.changes.add(DestinationRecord.Change(path.joinToString("."), Change.NULLED, reason))
}

fun map(
value: AirbyteValue,
schema: AirbyteType,
path: List<String> = emptyList()
): AirbyteValue =
try {
when (schema) {
is ObjectType -> mapObject(value as ObjectValue, schema, path)
is ObjectTypeWithoutSchema ->
mapObjectWithoutSchema(value as ObjectValue, schema, path)
is ObjectTypeWithEmptySchema ->
mapObjectWithEmptySchema(value as ObjectValue, schema, path)
is ArrayType -> mapArray(value as ArrayValue, schema, path)
is ArrayTypeWithoutSchema ->
mapArrayWithoutSchema(value as ArrayValue, schema, path)
is UnionType -> mapUnion(value, schema, path)
is BooleanType -> mapBoolean(value as BooleanValue, path)
is NumberType -> mapNumber(value as NumberValue, path)
is StringType -> mapString(value as StringValue, path)
is IntegerType -> mapInteger(value as IntegerValue, path)
is DateType -> mapDate(value as DateValue, path)
is TimeTypeWithTimezone -> mapTimeWithTimezone(value as TimeValue, path)
is TimeTypeWithoutTimezone -> mapTimeWithoutTimezone(value as TimeValue, path)
is TimestampTypeWithTimezone ->
mapTimestampWithTimezone(value as TimestampValue, path)
is TimestampTypeWithoutTimezone ->
mapTimestampWithoutTimezone(value as TimestampValue, path)
is NullType -> mapNull(path)
is UnknownType -> mapUnknown(value as UnknownValue, path)
}
} catch (e: Exception) {
collectFailure(path)
mapNull(path)
}

open fun mapObject(value: ObjectValue, schema: ObjectType, path: List<String>): AirbyteValue {
val values = LinkedHashMap<String, AirbyteValue>()
schema.properties.forEach { (name, field) ->
values[name] = map(value.values[name] ?: NullValue, field.type, path + name)
}
return ObjectValue(values)
}

open fun mapObjectWithoutSchema(
value: ObjectValue,
schema: ObjectTypeWithoutSchema,
path: List<String>
): AirbyteValue = value

open fun mapObjectWithEmptySchema(
value: ObjectValue,
schema: ObjectTypeWithEmptySchema,
path: List<String>
): AirbyteValue = value

open fun mapArray(value: ArrayValue, schema: ArrayType, path: List<String>): AirbyteValue {
return ArrayValue(
value.values.mapIndexed { index, element ->
map(element, schema.items.type, path + "[$index]")
}
)
}

open fun mapArrayWithoutSchema(
value: ArrayValue,
schema: ArrayTypeWithoutSchema,
path: List<String>
): AirbyteValue = value

open fun mapUnion(value: AirbyteValue, schema: UnionType, path: List<String>): AirbyteValue =
value

open fun mapBoolean(value: BooleanValue, path: List<String>): AirbyteValue = value

open fun mapNumber(value: NumberValue, path: List<String>): AirbyteValue = value

open fun mapString(value: StringValue, path: List<String>): AirbyteValue = value

open fun mapInteger(value: IntegerValue, path: List<String>): AirbyteValue = value

open fun mapDate(value: DateValue, path: List<String>): AirbyteValue = value

open fun mapTimeWithTimezone(value: TimeValue, path: List<String>): AirbyteValue = value

open fun mapTimeWithoutTimezone(value: TimeValue, path: List<String>): AirbyteValue = value

open fun mapTimestampWithTimezone(value: TimestampValue, path: List<String>): AirbyteValue =
value

open fun mapTimestampWithoutTimezone(value: TimestampValue, path: List<String>): AirbyteValue =
value

open fun mapNull(path: List<String>): AirbyteValue = NullValue

open fun mapUnknown(value: UnknownValue, path: List<String>): AirbyteValue = value
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ data class DestinationRecord(
name: String,
data: String,
emittedAtMs: Long,
changes: List<Change>? = null,
changes: MutableList<Change> = mutableListOf(),
) : this(
stream = DestinationStream.Descriptor(namespace, name),
data = JsonToAirbyteValue().convert(Jsons.deserialize(data), ObjectTypeWithoutSchema),
Expand All @@ -62,7 +62,7 @@ data class DestinationRecord(
serialized = "",
)

data class Meta(val changes: List<Change>?) {
data class Meta(val changes: MutableList<Change> = mutableListOf()) {
companion object {
const val COLUMN_NAME_AB_RAW_ID: String = "_airbyte_raw_id"
const val COLUMN_NAME_AB_EXTRACTED_AT: String = "_airbyte_extracted_at"
Expand All @@ -72,11 +72,8 @@ data class DestinationRecord(
}

fun asProtocolObject(): AirbyteRecordMessageMeta =
AirbyteRecordMessageMeta().also {
if (changes != null) {
it.changes = changes.map { change -> change.asProtocolObject() }
}
}
AirbyteRecordMessageMeta()
.withChanges(changes.map { change -> change.asProtocolObject() })
}

data class Change(
Expand Down Expand Up @@ -277,17 +274,20 @@ class DestinationMessageFactory(private val catalog: DestinationCatalog) {
data = JsonToAirbyteValue().convert(message.record.data, stream.schema),
emittedAtMs = message.record.emittedAt,
meta =
message.record.meta?.let { meta ->
DestinationRecord.Meta(
meta.changes?.map {
DestinationRecord.Change(
field = it.field,
change = it.change,
reason = it.reason,
)
}
)
},
DestinationRecord.Meta(
changes =
message.record.meta
?.changes
?.map {
DestinationRecord.Change(
field = it.field,
change = it.change,
reason = it.reason,
)
}
?.toMutableList()
?: mutableListOf()
),
serialized = serialized
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data

import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.test.util.ValueTestBuilder
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

class AirbyteValueIdentityMapperTest {
@Test
fun testIdentityMapping() {
val (inputValues, inputSchema, expectedValues) =
ValueTestBuilder()
.with(StringValue("a"), StringType)
.with(IntegerValue(1), IntegerType)
.with(BooleanValue(true), BooleanType)
.with(TimestampValue("2021-01-01T12:00:00Z"), TimestampTypeWithTimezone)
.with(TimestampValue("2021-01-01T12:00:00"), TimestampTypeWithoutTimezone)
.with(TimeValue("12:00:00Z"), TimeTypeWithTimezone)
.with(TimeValue("12:00:00"), TimeTypeWithoutTimezone)
.with(DateValue("2021-01-01"), DateType)
.withRecord()
.with(
ArrayValue(listOf("a", "b", "c").map(::StringValue)),
ArrayType(FieldType(StringType, false))
)
.with(
ArrayValue(listOf(IntegerValue(1), BooleanValue(true))),
ArrayTypeWithoutSchema
)
.withRecord()
.with(NullValue, NullType)
.endRecord()
.endRecord()
.build()

val meta = DestinationRecord.Meta()
val values = AirbyteValueIdentityMapper(meta).map(inputValues, inputSchema)
Assertions.assertEquals(expectedValues, values)
Assertions.assertTrue(meta.changes.isEmpty())
}

@Test
fun testIdentityMappingWithBadSchema() {
val (inputValues, inputSchema, _) =
ValueTestBuilder()
.with(StringValue("a"), StringType)
.with(
TimestampValue("2021-01-01T12:00:00Z"),
TimeTypeWithTimezone,
nameOverride = "bad"
)
.build()
val meta = DestinationRecord.Meta()
val values = AirbyteValueIdentityMapper(meta).map(inputValues, inputSchema) as ObjectValue
Assertions.assertTrue(meta.changes.isNotEmpty())
Assertions.assertTrue(values.values["bad"] is NullValue)
Assertions.assertTrue(meta.changes[0].field == "bad")
Assertions.assertTrue(
meta.changes[0].change == AirbyteRecordMessageMetaChange.Change.NULLED
)
Assertions.assertTrue(
meta.changes[0].reason ==
AirbyteRecordMessageMetaChange.Reason.DESTINATION_SERIALIZATION_ERROR
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class RecordDifferTest {
Missing record (pk=[IntegerValue(value=1), IntegerValue(value=100)], cursor=TimestampValue(value=1970-01-01T00:00Z)): OutputRecord(rawId=null, extractedAt=1970-01-01T00:00:01.234Z, loadedAt=null, generationId=42, data=ObjectValue(values={id1=IntegerValue(value=1), id2=IntegerValue(value=100), updated_at=TimestampValue(value=1970-01-01T00:00Z), name=StringValue(value=alice), phone=StringValue(value=1234)}), airbyteMeta=null)
Incorrect record (pk=[IntegerValue(value=1), IntegerValue(value=100)], cursor=TimestampValue(value=1970-01-01T00:00:02Z)):
generationId: Expected 42, got 41
airbyteMeta: Expected Meta(changes=null, syncId=42), got null
airbyteMeta: Expected Meta(changes=[], syncId=42), got null
phone: Expected StringValue(value=1234), but was StringValue(value=5678)
email: Expected StringValue(value=charlie@example.com), but was <unset>
address: Expected <unset>, but was StringValue(value=1234 charlie street)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,27 @@ class SchemaTestBuilder(
val parent: SchemaTestBuilder? = null
) {

fun with(given: FieldType, expected: FieldType = given): SchemaTestBuilder {
val name = UUID.randomUUID().toString()
fun with(
given: FieldType,
expected: FieldType = given,
nameOverride: String? = null
): SchemaTestBuilder {
val name = nameOverride ?: UUID.randomUUID().toString()
inputSchema.properties[name] = given
expectedSchema.properties[name] = expected
return this
}

fun with(given: AirbyteType, expected: AirbyteType = given): SchemaTestBuilder {
return with(FieldType(given, false), FieldType(expected, false))
fun with(
given: AirbyteType,
expected: AirbyteType = given,
nameOverride: String? = null
): SchemaTestBuilder {
return with(FieldType(given, false), FieldType(expected, false), nameOverride)
}

fun withRecord(nullable: Boolean = false): SchemaTestBuilder {
val name = UUID.randomUUID().toString()
fun withRecord(nullable: Boolean = false, nameOverride: String? = null): SchemaTestBuilder {
val name = nameOverride ?: UUID.randomUUID().toString()
val inputRecord = ObjectType(properties = LinkedHashMap())
val outputRecord = ObjectType(properties = LinkedHashMap())
inputSchema.properties[name] = FieldType(inputRecord, nullable = nullable)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.test.util

import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.ObjectValue
import java.util.UUID

data class ValueTestBuilder(
private val inputValues: ObjectValue = ObjectValue(linkedMapOf()),
private val expectedValues: ObjectValue = ObjectValue(linkedMapOf()),
private val schemaTestBuilder: SchemaTestBuilder = SchemaTestBuilder(),
private val parent: ValueTestBuilder? = null
) {
fun with(
inputValue: AirbyteValue,
inputSchema: AirbyteType,
expectedValue: AirbyteValue = inputValue,
nameOverride: String? = null
): ValueTestBuilder {
val name = nameOverride ?: UUID.randomUUID().toString()
inputValues.values[name] = inputValue
expectedValues.values[name] = expectedValue
schemaTestBuilder.with(inputSchema, nameOverride = name)
return this
}

fun withRecord(): ValueTestBuilder {
val name = UUID.randomUUID().toString()
val inputRecord = ObjectValue(linkedMapOf())
val outputRecord = ObjectValue(linkedMapOf())
inputValues.values[name] = inputRecord
expectedValues.values[name] = outputRecord
return ValueTestBuilder(
inputValues = inputRecord,
expectedValues = outputRecord,
schemaTestBuilder = schemaTestBuilder.withRecord(nameOverride = name),
parent = this
)
}

fun endRecord(): ValueTestBuilder {
if (parent == null) {
throw IllegalStateException("Cannot end record without parent")
}
return parent.copy(schemaTestBuilder = schemaTestBuilder.endRecord())
}

fun build(): Triple<ObjectValue, ObjectType, ObjectValue> {
if (parent != null) {
throw IllegalStateException("Cannot build nested schema")
}
return Triple(inputValues, schemaTestBuilder.build().first, expectedValues)
}
}
Loading

0 comments on commit 15ff3ee

Please sign in to comment.