-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: default timestamp extractor override is not working #3176
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this LGTM, but I'm trying to understand why doesn't it work in the CLI?
|
||
return (TimestampExtractor) defaultTimestampExtractor.newInstance(); | ||
} catch (final Exception e) { | ||
// TODO: Need to log invalid timestamp configuration here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not violently fail (e.g. raise exception) here? I think I'd rather have my command fail than do something which wasn't what i defined
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot to remove that line. I left it as a reminder for me to do something there before submitting the PR.
Anyway, you're right, I should fail instead of just logging the error. I submitted another PR (this time for 5.3.x branch instead of master) which fails in that case.
@@ -230,6 +233,51 @@ public void shouldBuildSerdeOptions() { | |||
assertThat(cmd.getSerdeOptions(), is(SOME_SERDE_OPTIONS)); | |||
} | |||
|
|||
@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a QTT for this? we should be able to pass it in the property overrides for a command
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just added one for this.
7970a25
to
4f2d7d5
Compare
4f2d7d5
to
ae653d6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @spena! LGTM
seems very strange to me also that this doesn't work using |
I agree with @blueedgenick, this should work from CLI too. The problem with |
@agavra I submitted another commit to fix the issue when the CLI fails sending a REST request that contains a Class value in the stream properties field. |
@hjafarpour I decided to convert the Class values of the stream properties to String values. For some reason, Jackson does not detect the class in the classpath when creating the JSON object. The jar is in the classpath, in fact, the Class is available in the streams properties as The code is working now. If you're fine with it, then I can merge it later. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @spena
Added a few comments.
@@ -31,9 +37,23 @@ public static TimestampExtractionPolicy create( | |||
final KsqlSchema schema, | |||
final Optional<String> timestampColumnName, | |||
final Optional<String> timestampFormat | |||
) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was used only in one place, LogicalPlanner
. Why not remove this method and just use the next one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll do that, thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
assertThat(result.create(0), instanceOf(FailOnInvalidTimestamp.class)); | ||
} | ||
|
||
@Test(expected = ConfigException.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use ExpectedException
so you can also check the correctness of the error messages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I followed how other tests cases of this class verify error exceptions.
Btw, What is our preference? Should I modify the other tests cases with the ExpectdException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExpectdException
would be preferred. Check KsqlRequestTest
for examples.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
); | ||
} | ||
|
||
@Test(expected = KsqlException.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto here, use ExpectedException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -46,7 +47,7 @@ public KsqlRequest( | |||
this.ksql = ksql == null ? "" : ksql; | |||
this.streamsProperties = streamsProperties == null | |||
? Collections.emptyMap() | |||
: Collections.unmodifiableMap(new HashMap<>(streamsProperties)); | |||
: Collections.unmodifiableMap(new HashMap<>(serializeClassValues(streamsProperties))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to add a test for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
5b72f5f
to
598c27a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Description
The use of ksql.streams.default.timestamp.extractor when creating a stream/table is not working. The new value is persisted in the command topic, but KSQL always use a default FailOnInvalidTimestamp
This patch fixes KSQL so it honors the new default specified.
Testing done
Added unit tests.
Verified a stream created with
'ksql.streams.default.timestamp.extractor'='org.apache.kafka.streams.processor.WallclockTimestampExtractor'
results in queries displaying the current Wallclock time.Verified a stream created with no timestamp extractor uses the default
FailedOnInvalidTimestamp
, which displays the timestamp it was used during the insertion of the row.To test it, I had to call the REST api instead of the CLI because I couldn't make the CLI to recognize a different timestamp extractor:
Reviewer checklist