-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: serde for TIME type #7664
Conversation
final ConnectDataTranslator connectToKsqlTranslator = new ConnectDataTranslator(Time.SCHEMA); | ||
|
||
// When: | ||
final Object row = connectToKsqlTranslator.toKsqlRow(Time.SCHEMA, new Date(100L)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a Date
. Why is the result a Time
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Connect time types (Time, Date, Timestamp) store data as java.util.Date
. ConnectDataTranslator
converts data from Connect to KSQL.
case TIMESTAMP: | ||
return handleTimestamp((Timestamp) value); | ||
default: | ||
return value; | ||
} | ||
} | ||
|
||
private static Integer handleTime(final Time value) { | ||
// Return milliseconds | ||
return value == null ? null : (int) value.getTime(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why isn't using long
? I see that getTime()
returns long. In fact, I just noticed that we are using INT32 for times and you're returning longValue()
in the connector converter. Is there a problem with using long and int in different contexts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avro and Protobuf store Time as int, so we're serializing everything to int for consistency. The longValue() in the connect converter eventually gets converted to a java.sql.Time
object, so it doesn't cause problems, but I've updated that and others to use int so that its clearer.
@@ -163,6 +169,14 @@ private static Object enforceFieldType( | |||
} | |||
} | |||
|
|||
private static Object handleInt(final JsonValueContext context) { | |||
if (context.schema.name() == Time.LOGICAL_NAME) { | |||
return JsonSerdeUtils.toTime(context.val); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that inside JsonSerdeUtils.toTime
is obtained the number as long, but here we sai handleInt
. What is the correct type we need to handle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see previous comment
return new Time(object.asLong()); | ||
return new Time(object.asInt()); | ||
} | ||
if (object instanceof TextNode) { | ||
try { | ||
return new Time(Long.parseLong(object.textValue())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should handle overflow/underflow issues in these two cases. First, are we sure that we cannot handle a Time numeric value that is higher/lower than the INT type? If so, then we need to make sure the Json object does not have a huge value that can cause an overflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a check for this
@@ -131,7 +131,7 @@ private static Parser decimalParser(final SqlType sqlType) { | |||
} | |||
|
|||
private static Parser timeParser(final SqlType sqlType) { | |||
return v -> new Time(Long.parseLong(v)); | |||
return v -> new Time(Integer.parseInt(v)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question about overflow if the type has a value higher than the maximum INT.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a check for this
@@ -227,7 +227,7 @@ private Object toKsqlValue( | |||
} | |||
case INT32: | |||
if (schema.name() == Time.LOGICAL_NAME) { | |||
return new java.sql.Time(((Number) convertedValue).longValue()); | |||
return new java.sql.Time(((Number) convertedValue).intValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assumed that being the number INT32, then intValue
won't return an overflowed/underflowed value if it has a higher/lower number, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, Connect does this check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Description
Implement serde for TIME. TIME gets serialized to an integer number of milliseconds, and deserialized to
java.sql.time
Testing done
Added unit tests
Reviewer checklist