Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify parseField #410

Merged
merged 2 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 31 additions & 25 deletions lib/cdc/util/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,38 @@ func parseField(field debezium.Field, value any) (any, error) {
return int(valFloat), nil
}

if valid, supportedType := debezium.RequiresSpecialTypeCasting(field.DebeziumType); valid {
switch debezium.SupportedDebeziumType(field.DebeziumType) {
case debezium.JSON:
return jsonutil.SanitizePayload(value)
case debezium.GeometryType, debezium.GeographyType:
return parseGeometry(value)
case debezium.GeometryPointType:
return parseGeometryPoint(value)
case debezium.KafkaDecimalType:
bytes, err := debezium.ToBytes(value)
if err != nil {
return nil, err
}
return field.DecodeDecimal(bytes)
case debezium.KafkaVariableNumericType:
return field.DecodeDebeziumVariableDecimal(value)
default:
// Need to cast this as a FLOAT first because the number may come out in scientific notation
// ParseFloat is apt to handle it, and ParseInt is not, see: https://github.com/golang/go/issues/19288
floatVal, castErr := strconv.ParseFloat(fmt.Sprint(value), 64)
if castErr != nil {
return nil, castErr
}

return debezium.FromDebeziumTypeToTime(supportedType, int64(floatVal))
switch field.DebeziumType {
case debezium.JSON:
return jsonutil.SanitizePayload(value)
case debezium.GeometryType, debezium.GeographyType:
return parseGeometry(value)
case debezium.GeometryPointType:
return parseGeometryPoint(value)
case debezium.KafkaDecimalType:
bytes, err := debezium.ToBytes(value)
if err != nil {
return nil, err
}
return field.DecodeDecimal(bytes)
case debezium.KafkaVariableNumericType:
return field.DecodeDebeziumVariableDecimal(value)
case
debezium.Timestamp,
debezium.MicroTimestamp,
debezium.Date,
debezium.Time,
debezium.MicroTime,
debezium.DateKafkaConnect,
debezium.TimeKafkaConnect,
debezium.DateTimeKafkaConnect:
// Need to cast this as a FLOAT first because the number may come out in scientific notation
// ParseFloat is apt to handle it, and ParseInt is not, see: https://github.com/golang/go/issues/19288
floatVal, castErr := strconv.ParseFloat(fmt.Sprint(value), 64)
if castErr != nil {
return nil, castErr
}

return debezium.FromDebeziumTypeToTime(field.DebeziumType, int64(floatVal))
}

return value, nil
Expand Down
28 changes: 0 additions & 28 deletions lib/debezium/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
type SupportedDebeziumType string

const (
Invalid SupportedDebeziumType = "invalid"
JSON SupportedDebeziumType = "io.debezium.data.Json"
Enum SupportedDebeziumType = "io.debezium.data.Enum"
EnumSet SupportedDebeziumType = "io.debezium.data.EnumSet"
Expand Down Expand Up @@ -46,33 +45,6 @@ const (
KafkaDecimalPrecisionKey = "connect.decimal.precision"
)

var typesThatRequireTypeCasting = []SupportedDebeziumType{
Timestamp,
MicroTimestamp,
Date,
Time,
MicroTime,
DateKafkaConnect,
TimeKafkaConnect,
DateTimeKafkaConnect,
KafkaDecimalType,
KafkaVariableNumericType,
JSON,
GeometryPointType,
GeometryType,
GeographyType,
}

func RequiresSpecialTypeCasting(typeLabel SupportedDebeziumType) (bool, SupportedDebeziumType) {
for _, supportedType := range typesThatRequireTypeCasting {
if typeLabel == supportedType {
return true, supportedType
}
}

return false, Invalid
}

// ToBytes attempts to convert a value of unknown type to a slice of bytes.
// - If value is already a slice of bytes it will be directly returned.
// - If value is a string we will attempt to base64 decode it.
Expand Down