Skip to content

Commit

Permalink
[debezium] Log if time value is not a float64 (#414)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored Apr 8, 2024
1 parent 244d454 commit 46e8984
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 11 deletions.
14 changes: 10 additions & 4 deletions lib/debezium/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ const (
KafkaDecimalPrecisionKey = "connect.decimal.precision"
)

// ToBytes attempts to convert a value of unknown type to a slice of bytes.
// toBytes attempts to convert a value of unknown type to a slice of bytes.
// - If value is already a slice of bytes it will be directly returned.
// - If value is a string we will attempt to base64 decode it.
// - If value is any other type we will convert it to a string and then attempt to base64 decode it.
func ToBytes(value any) ([]byte, error) {
func toBytes(value any) ([]byte, error) {
var stringVal string

switch typedValue := value.(type) {
Expand Down Expand Up @@ -99,7 +99,7 @@ func (f Field) ParseValue(value any) (any, error) {
case GeometryPointType:
return parseGeometryPoint(value)
case KafkaDecimalType:
bytes, err := ToBytes(value)
bytes, err := toBytes(value)
if err != nil {
return nil, err
}
Expand All @@ -115,6 +115,12 @@ func (f Field) ParseValue(value any) (any, error) {
DateKafkaConnect,
TimeKafkaConnect,
DateTimeKafkaConnect:
if _, ok := value.(float64); !ok {
// Since this value is coming from Kafka, and will have been marshaled to a JSON string, it should always
// be a float64. Let's check this if this assumption holds and if so clean up the code below so that we
// aren't doing float -> string -> float.
slog.Error(fmt.Sprintf("Expected float64 received %T with value '%v'", value, value))
}
// Need to cast this as a FLOAT first because the number may come out in scientific notation
// ParseFloat is apt to handle it, and ParseInt is not, see: https://github.com/golang/go/issues/19288
floatVal, castErr := strconv.ParseFloat(fmt.Sprint(value), 64)
Expand Down Expand Up @@ -194,7 +200,7 @@ func (f Field) DecodeDebeziumVariableDecimal(value any) (*decimal.Decimal, error
return nil, fmt.Errorf("encoded value does not exist")
}

bytes, err := ToBytes(val)
bytes, err := toBytes(val)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions lib/debezium/types_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func BenchmarkDecodeDecimal_P64_S10(b *testing.B) {
}
field := Field{Parameters: parameters}
for i := 0; i < b.N; i++ {
bytes, err := ToBytes("AwBGAw8m9GLXrCGifrnVP/8jPHrNEtd1r4rS")
bytes, err := toBytes("AwBGAw8m9GLXrCGifrnVP/8jPHrNEtd1r4rS")
assert.NoError(b, err)
dec, err := field.DecodeDecimal(bytes)
assert.NoError(b, err)
Expand All @@ -31,7 +31,7 @@ func BenchmarkDecodeDecimal_P38_S2(b *testing.B) {
}
field := Field{Parameters: parameters}
for i := 0; i < b.N; i++ {
bytes, err := ToBytes(`AMCXznvJBxWzS58P/////w==`)
bytes, err := toBytes(`AMCXznvJBxWzS58P/////w==`)
assert.NoError(b, err)
dec, err := field.DecodeDecimal(bytes)
assert.NoError(b, err)
Expand All @@ -47,7 +47,7 @@ func BenchmarkDecodeDecimal_P5_S2(b *testing.B) {

field := Field{Parameters: parameters}
for i := 0; i < b.N; i++ {
bytes, err := ToBytes(`AOHJ`)
bytes, err := toBytes(`AOHJ`)
assert.NoError(b, err)
dec, err := field.DecodeDecimal(bytes)
assert.NoError(b, err)
Expand Down
8 changes: 4 additions & 4 deletions lib/debezium/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestToBytes(t *testing.T) {
}

for _, testCase := range testCases {
actual, err := ToBytes(testCase.value)
actual, err := toBytes(testCase.value)

if testCase.expectedErr == "" {
assert.Equal(t, testCase.expectedValue, actual, testCase.name)
Expand Down Expand Up @@ -267,7 +267,7 @@ func TestFromDebeziumTypeTimePrecisionConnect(t *testing.T) {
assert.Equal(t, "2023-03-13", extendedDate.String(""))
}

func TestDecodeDecimal(t *testing.T) {
func TestField_DecodeDecimal(t *testing.T) {
type _testCase struct {
name string
encoded string
Expand Down Expand Up @@ -437,7 +437,7 @@ func TestDecodeDecimal(t *testing.T) {
Parameters: testCase.params,
}

bytes, err := ToBytes(testCase.encoded)
bytes, err := toBytes(testCase.encoded)
assert.NoError(t, err)

dec, err := field.DecodeDecimal(bytes)
Expand All @@ -461,7 +461,7 @@ func TestDecodeDecimal(t *testing.T) {
}
}

func TestDecodeDebeziumVariableDecimal(t *testing.T) {
func TestField_DecodeDebeziumVariableDecimal(t *testing.T) {
type _testCase struct {
name string
value any
Expand Down

0 comments on commit 46e8984

Please sign in to comment.