Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load CDK: avro tests have temporal types #51546

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ class AirbyteValueWithMetaToOutputRecord {
Instant.ofEpochMilli(
value.values[Meta.COLUMN_NAME_AB_EXTRACTED_AT].let { v ->
when (v) {
is IntegerValue -> v.value.toLong()
is TimestampWithTimezoneValue -> v.value.toEpochSecond()
else -> throw IllegalArgumentException("Invalid extractedAt value: $v")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,63 +7,35 @@ package io.airbyte.cdk.load.data.avro
import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.ArrayValue
import io.airbyte.cdk.load.data.DateValue
import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.TimeWithTimezoneValue
import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue
import io.airbyte.cdk.load.data.TimestampWithTimezoneValue
import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue
import io.airbyte.cdk.load.test.util.ExpectedRecordMapper
import io.airbyte.cdk.load.test.util.OutputRecord
import java.time.LocalDate
import java.time.ZoneOffset
import java.time.temporal.ChronoField
import java.time.temporal.TemporalAccessor

object AvroExpectedRecordMapper : ExpectedRecordMapper {
override fun mapRecord(expectedRecord: OutputRecord, schema: AirbyteType): OutputRecord {
return expectedRecord.copy(data = timestampsToInteger(expectedRecord.data) as ObjectValue)
}

/**
* Avro doesn't have true temporal types. Instead, we write dates as epoch days, and other
* temporal types as epochMicros. Therefore, in expected records, we should convert from real
* temporal types to IntegerValue.
* Avro doesn't have distinguish between temporal types having/not having timezone. So we map
* all temporal types to their "with timezone" variant, defaulting to UTC.
*/
private fun timestampsToInteger(value: AirbyteValue): AirbyteValue =
when (value) {
is DateValue -> IntegerValue(value.value.toEpochDay())
is TimestampWithTimezoneValue -> {
val micros = getMicros(value.value)
val epochSecond = value.value.toEpochSecond()
integerValue(epochSecond, micros)
}
is TimestampWithoutTimezoneValue -> {
val micros = getMicros(value.value)
val epochSecond = value.value.toEpochSecond(ZoneOffset.UTC)
integerValue(epochSecond, micros)
}
is TimeWithTimezoneValue -> {
val micros = getMicros(value.value)
val epochSecond = value.value.toEpochSecond(LocalDate.EPOCH)
integerValue(epochSecond, micros)
}
is TimeWithoutTimezoneValue -> {
val micros = getMicros(value.value)
val epochSecond = value.value.toEpochSecond(LocalDate.EPOCH, ZoneOffset.UTC)
integerValue(epochSecond, micros)
}
is TimestampWithoutTimezoneValue ->
TimestampWithTimezoneValue(value.value.atOffset(ZoneOffset.UTC))
is TimeWithoutTimezoneValue ->
TimeWithTimezoneValue(value.value.atOffset(ZoneOffset.UTC))
is ArrayValue -> ArrayValue(value.values.map { timestampsToInteger(it) })
is ObjectValue ->
ObjectValue(
value.values.mapValuesTo(linkedMapOf()) { (_, v) -> timestampsToInteger(v) }
)
else -> value
}

private fun getMicros(value: TemporalAccessor) = value.getLong(ChronoField.MICRO_OF_SECOND)

private fun integerValue(epochSecond: Long, micros: Long) =
IntegerValue(epochSecond * 1_000_000 + micros)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,39 @@ package io.airbyte.cdk.load.data.avro
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.ArrayValue
import io.airbyte.cdk.load.data.BooleanValue
import io.airbyte.cdk.load.data.DateValue
import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.load.data.NumberValue
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.StringValue
import io.airbyte.cdk.load.data.TimeWithTimezoneValue
import io.airbyte.cdk.load.data.TimestampWithTimezoneValue
import java.time.Instant
import java.time.LocalDate
import java.time.OffsetTime
import java.time.ZoneOffset
import org.apache.avro.LogicalType
import org.apache.avro.LogicalTypes
import org.apache.avro.Schema
import org.apache.avro.generic.GenericArray
import org.apache.avro.generic.GenericRecord
import org.apache.avro.util.Utf8

object AvroRecordToAirbyteValue {
fun convert(avroValue: Any?): AirbyteValue {
fun convert(avroValue: Any?, schema: Schema): AirbyteValue {
return when (avroValue) {
null -> NullValue
is GenericRecord ->
ObjectValue(
avroValue.schema.fields.associateTo(linkedMapOf()) { field ->
field.name() to convert(avroValue.get(field.name()))
field.name() to convert(avroValue.get(field.name()), field.schema())
}
)
is GenericArray<*> -> ArrayValue(avroValue.map { convert(it) })
is GenericArray<*> -> ArrayValue(avroValue.map { convert(it, schema.elementType) })
is Boolean -> BooleanValue(avroValue)
is Int -> IntegerValue(avroValue.toLong())
is Long -> IntegerValue(avroValue)
is Int -> handleInt(avroValue.toLong(), schema.logicalType)
is Long -> handleInt(avroValue, schema.logicalType)
is Double -> NumberValue(avroValue.toBigDecimal())
is Utf8 -> StringValue(avroValue.toString())
is String -> StringValue(avroValue)
Expand All @@ -41,6 +51,16 @@ object AvroRecordToAirbyteValue {
}
}

private fun handleInt(l: Long, logicalType: LogicalType?): AirbyteValue =
when (logicalType) {
LogicalTypes.date() -> DateValue(LocalDate.ofEpochDay(l))
LogicalTypes.timeMicros() ->
TimeWithTimezoneValue(OffsetTime.ofInstant(Instant.ofEpochMilli(l), ZoneOffset.UTC))
LogicalTypes.timestampMicros() ->
TimestampWithTimezoneValue(Instant.ofEpochMilli(l).atOffset(ZoneOffset.UTC))
else -> IntegerValue(l)
}

fun GenericRecord.toAirbyteValue(): AirbyteValue {
return AvroRecordToAirbyteValue.convert(this)
return AvroRecordToAirbyteValue.convert(this, this.schema)
}
Loading