diff --git a/lib/cdc/util/parse.go b/lib/cdc/util/parse.go index a0809b463..d85d649cc 100644 --- a/lib/cdc/util/parse.go +++ b/lib/cdc/util/parse.go @@ -23,32 +23,38 @@ func parseField(field debezium.Field, value any) (any, error) { return int(valFloat), nil } - if valid, supportedType := debezium.RequiresSpecialTypeCasting(field.DebeziumType); valid { - switch debezium.SupportedDebeziumType(field.DebeziumType) { - case debezium.JSON: - return jsonutil.SanitizePayload(value) - case debezium.GeometryType, debezium.GeographyType: - return parseGeometry(value) - case debezium.GeometryPointType: - return parseGeometryPoint(value) - case debezium.KafkaDecimalType: - bytes, err := debezium.ToBytes(value) - if err != nil { - return nil, err - } - return field.DecodeDecimal(bytes) - case debezium.KafkaVariableNumericType: - return field.DecodeDebeziumVariableDecimal(value) - default: - // Need to cast this as a FLOAT first because the number may come out in scientific notation - // ParseFloat is apt to handle it, and ParseInt is not, see: https://github.com/golang/go/issues/19288 - floatVal, castErr := strconv.ParseFloat(fmt.Sprint(value), 64) - if castErr != nil { - return nil, castErr - } - - return debezium.FromDebeziumTypeToTime(supportedType, int64(floatVal)) + switch field.DebeziumType { + case debezium.JSON: + return jsonutil.SanitizePayload(value) + case debezium.GeometryType, debezium.GeographyType: + return parseGeometry(value) + case debezium.GeometryPointType: + return parseGeometryPoint(value) + case debezium.KafkaDecimalType: + bytes, err := debezium.ToBytes(value) + if err != nil { + return nil, err } + return field.DecodeDecimal(bytes) + case debezium.KafkaVariableNumericType: + return field.DecodeDebeziumVariableDecimal(value) + case + debezium.Timestamp, + debezium.MicroTimestamp, + debezium.Date, + debezium.Time, + debezium.MicroTime, + debezium.DateKafkaConnect, + debezium.TimeKafkaConnect, + debezium.DateTimeKafkaConnect: + // Need to cast this as a FLOAT first because the number may come out in scientific notation + // ParseFloat is apt to handle it, and ParseInt is not, see: https://github.com/golang/go/issues/19288 + floatVal, castErr := strconv.ParseFloat(fmt.Sprint(value), 64) + if castErr != nil { + return nil, castErr + } + + return debezium.FromDebeziumTypeToTime(field.DebeziumType, int64(floatVal)) } return value, nil diff --git a/lib/debezium/types.go b/lib/debezium/types.go index ac9083acb..b12a63e66 100644 --- a/lib/debezium/types.go +++ b/lib/debezium/types.go @@ -16,7 +16,6 @@ import ( type SupportedDebeziumType string const ( - Invalid SupportedDebeziumType = "invalid" JSON SupportedDebeziumType = "io.debezium.data.Json" Enum SupportedDebeziumType = "io.debezium.data.Enum" EnumSet SupportedDebeziumType = "io.debezium.data.EnumSet" @@ -46,33 +45,6 @@ const ( KafkaDecimalPrecisionKey = "connect.decimal.precision" ) -var typesThatRequireTypeCasting = []SupportedDebeziumType{ - Timestamp, - MicroTimestamp, - Date, - Time, - MicroTime, - DateKafkaConnect, - TimeKafkaConnect, - DateTimeKafkaConnect, - KafkaDecimalType, - KafkaVariableNumericType, - JSON, - GeometryPointType, - GeometryType, - GeographyType, -} - -func RequiresSpecialTypeCasting(typeLabel SupportedDebeziumType) (bool, SupportedDebeziumType) { - for _, supportedType := range typesThatRequireTypeCasting { - if typeLabel == supportedType { - return true, supportedType - } - } - - return false, Invalid -} - // ToBytes attempts to convert a value of unknown type to a slice of bytes. // - If value is already a slice of bytes it will be directly returned. // - If value is a string we will attempt to base64 decode it.