From 46e8984e56edac9998d90a5642bec3f3980b7517 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Mon, 8 Apr 2024 11:37:46 -0700 Subject: [PATCH] [debezium] Log if time value is not a `float64` (#414) --- lib/debezium/types.go | 14 ++++++++++---- lib/debezium/types_bench_test.go | 6 +++--- lib/debezium/types_test.go | 8 ++++---- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/lib/debezium/types.go b/lib/debezium/types.go index e7a71c960..44e99c4dc 100644 --- a/lib/debezium/types.go +++ b/lib/debezium/types.go @@ -48,11 +48,11 @@ const ( KafkaDecimalPrecisionKey = "connect.decimal.precision" ) -// ToBytes attempts to convert a value of unknown type to a slice of bytes. +// toBytes attempts to convert a value of unknown type to a slice of bytes. // - If value is already a slice of bytes it will be directly returned. // - If value is a string we will attempt to base64 decode it. // - If value is any other type we will convert it to a string and then attempt to base64 decode it. -func ToBytes(value any) ([]byte, error) { +func toBytes(value any) ([]byte, error) { var stringVal string switch typedValue := value.(type) { @@ -99,7 +99,7 @@ func (f Field) ParseValue(value any) (any, error) { case GeometryPointType: return parseGeometryPoint(value) case KafkaDecimalType: - bytes, err := ToBytes(value) + bytes, err := toBytes(value) if err != nil { return nil, err } @@ -115,6 +115,12 @@ func (f Field) ParseValue(value any) (any, error) { DateKafkaConnect, TimeKafkaConnect, DateTimeKafkaConnect: + if _, ok := value.(float64); !ok { + // Since this value is coming from Kafka, and will have been marshaled to a JSON string, it should always + // be a float64. Let's check this if this assumption holds and if so clean up the code below so that we + // aren't doing float -> string -> float. + slog.Error(fmt.Sprintf("Expected float64 received %T with value '%v'", value, value)) + } // Need to cast this as a FLOAT first because the number may come out in scientific notation // ParseFloat is apt to handle it, and ParseInt is not, see: https://github.com/golang/go/issues/19288 floatVal, castErr := strconv.ParseFloat(fmt.Sprint(value), 64) @@ -194,7 +200,7 @@ func (f Field) DecodeDebeziumVariableDecimal(value any) (*decimal.Decimal, error return nil, fmt.Errorf("encoded value does not exist") } - bytes, err := ToBytes(val) + bytes, err := toBytes(val) if err != nil { return nil, err } diff --git a/lib/debezium/types_bench_test.go b/lib/debezium/types_bench_test.go index 9c557e5b5..00e763e9d 100644 --- a/lib/debezium/types_bench_test.go +++ b/lib/debezium/types_bench_test.go @@ -15,7 +15,7 @@ func BenchmarkDecodeDecimal_P64_S10(b *testing.B) { } field := Field{Parameters: parameters} for i := 0; i < b.N; i++ { - bytes, err := ToBytes("AwBGAw8m9GLXrCGifrnVP/8jPHrNEtd1r4rS") + bytes, err := toBytes("AwBGAw8m9GLXrCGifrnVP/8jPHrNEtd1r4rS") assert.NoError(b, err) dec, err := field.DecodeDecimal(bytes) assert.NoError(b, err) @@ -31,7 +31,7 @@ func BenchmarkDecodeDecimal_P38_S2(b *testing.B) { } field := Field{Parameters: parameters} for i := 0; i < b.N; i++ { - bytes, err := ToBytes(`AMCXznvJBxWzS58P/////w==`) + bytes, err := toBytes(`AMCXznvJBxWzS58P/////w==`) assert.NoError(b, err) dec, err := field.DecodeDecimal(bytes) assert.NoError(b, err) @@ -47,7 +47,7 @@ func BenchmarkDecodeDecimal_P5_S2(b *testing.B) { field := Field{Parameters: parameters} for i := 0; i < b.N; i++ { - bytes, err := ToBytes(`AOHJ`) + bytes, err := toBytes(`AOHJ`) assert.NoError(b, err) dec, err := field.DecodeDecimal(bytes) assert.NoError(b, err) diff --git a/lib/debezium/types_test.go b/lib/debezium/types_test.go index faacaf589..ebcca60ac 100644 --- a/lib/debezium/types_test.go +++ b/lib/debezium/types_test.go @@ -42,7 +42,7 @@ func TestToBytes(t *testing.T) { } for _, testCase := range testCases { - actual, err := ToBytes(testCase.value) + actual, err := toBytes(testCase.value) if testCase.expectedErr == "" { assert.Equal(t, testCase.expectedValue, actual, testCase.name) @@ -267,7 +267,7 @@ func TestFromDebeziumTypeTimePrecisionConnect(t *testing.T) { assert.Equal(t, "2023-03-13", extendedDate.String("")) } -func TestDecodeDecimal(t *testing.T) { +func TestField_DecodeDecimal(t *testing.T) { type _testCase struct { name string encoded string @@ -437,7 +437,7 @@ func TestDecodeDecimal(t *testing.T) { Parameters: testCase.params, } - bytes, err := ToBytes(testCase.encoded) + bytes, err := toBytes(testCase.encoded) assert.NoError(t, err) dec, err := field.DecodeDecimal(bytes) @@ -461,7 +461,7 @@ func TestDecodeDecimal(t *testing.T) { } } -func TestDecodeDebeziumVariableDecimal(t *testing.T) { +func TestField_DecodeDebeziumVariableDecimal(t *testing.T) { type _testCase struct { name string value any