diff --git a/lib/debezium/types.go b/lib/debezium/types.go index 21b454089..baf79cec9 100644 --- a/lib/debezium/types.go +++ b/lib/debezium/types.go @@ -134,12 +134,21 @@ func (f Field) ParseValue(value any) (any, error) { DateKafkaConnect, TimeKafkaConnect, DateTimeKafkaConnect: - if _, ok := value.(float64); !ok { - // Since this value is coming from Kafka, and will have been marshaled to a JSON string, it should always - // be a float64. Let's check this if this assumption holds and if so clean up the code below so that we - // aren't doing float -> string -> float. + + switch value.(type) { + case float64: + // This value is coming from Kafka, and will have been marshaled to a JSON string, so when it is + // unmarshalled it will be a float64 + // -> Pass. + case int64: + // This value is coming from reader. + // -> Pass. + default: + // Value should always be a float64 int64, but let's check this if this assumption holds and if so clean up + // the code below so that we aren't doing float -> string -> float. slog.Error(fmt.Sprintf("Expected float64 received %T with value '%v'", value, value)) } + // Need to cast this as a FLOAT first because the number may come out in scientific notation // ParseFloat is apt to handle it, and ParseInt is not, see: https://github.com/golang/go/issues/19288 floatVal, castErr := strconv.ParseFloat(fmt.Sprint(value), 64) diff --git a/lib/debezium/types_test.go b/lib/debezium/types_test.go index dbcef0bed..5f6655ae7 100644 --- a/lib/debezium/types_test.go +++ b/lib/debezium/types_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/artie-labs/transfer/lib/typing/decimal" + "github.com/artie-labs/transfer/lib/typing/ext" "github.com/stretchr/testify/assert" ) @@ -275,6 +276,51 @@ func TestField_ParseValue(t *testing.T) { value: `[[{"foo":"bar", "foo": "bar"}], [{"hello":"world"}, {"dusty":"the mini aussie"}]]`, expectedValue: `[[{"foo":"bar"}],[{"hello":"world"},{"dusty":"the mini aussie"}]]`, }, + { + name: "int64 micro-timestamp", + field: Field{ + Type: Int64, + DebeziumType: MicroTimestamp, + }, + value: int64(1712609795827000), + expectedValue: &ext.ExtendedTime{ + Time: time.Date(2024, time.April, 8, 20, 56, 35, 827000000, time.UTC), + NestedKind: ext.NestedKind{ + Type: ext.DateTimeKindType, + Format: "2006-01-02T15:04:05.999999999Z07:00", + }, + }, + }, + { + name: "float64 micro-timestamp", + field: Field{ + Type: Int64, + DebeziumType: MicroTimestamp, + }, + value: float64(1712609795827000), + expectedValue: &ext.ExtendedTime{ + Time: time.Date(2024, time.April, 8, 20, 56, 35, 827000000, time.UTC), + NestedKind: ext.NestedKind{ + Type: ext.DateTimeKindType, + Format: "2006-01-02T15:04:05.999999999Z07:00", + }, + }, + }, + { + name: "string micro-timestamp", + field: Field{ + Type: Int64, + DebeziumType: MicroTimestamp, + }, + value: "1712609795827000", + expectedValue: &ext.ExtendedTime{ + Time: time.Date(2024, time.April, 8, 20, 56, 35, 827000000, time.UTC), + NestedKind: ext.NestedKind{ + Type: ext.DateTimeKindType, + Format: "2006-01-02T15:04:05.999999999Z07:00", + }, + }, + }, } for _, testCase := range testCases {