-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: support unwrapped struct value inference #6446
Conversation
57780f4
to
d3a3ffc
Compare
@@ -122,6 +123,7 @@ public DefaultSchemaInjector(final TopicSchemaSupplier schemaSupplier) { | |||
props.getKafkaTopic(), | |||
props.getKeySchemaId(), | |||
keyFormat, | |||
SerdeFeatures.of(SerdeFeature.UNWRAP_SINGLES), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like that this is hard-coded here. If it's to be hard-coded somewhere, I'd prefer it to be further down in the call stack, e.g., in the schema supplier or even in the schema translator. The drawback to that would be that we'd no longer be able to share code between the key and value inference pathways as easily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First off, you should only be setting this flag if the format supports unwrapping! Or more correctly, if the format supports both unwrapping and wrapping.
As to hard coding it... humm, yes, so far it's been hard-coded in one place: SerdeFeaturesFactory
.
We could do something like:
SerdeFeatures.of(SerdeFeature.UNWRAP_SINGLES), | |
SerdeFeaturesFactory.buildInternal(FormatFactory.of(keyFormat), |
But, to be honest, this is only a temporary thing. At the moment, we only support unwrapped keys. Once we support something else we'll wire it up and this hardcoding will go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call on only setting the feature if it's supported.
I worry the naming of buildInternal()
will be confusing here since it's not clear what internal topics have to do with schema registry. Is it worth introducing another alias for the method such as setUnwrappingIfSupported()
or would that only make things worse?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
columns = translator.toColumns( | ||
parsedSchema, | ||
isKey, | ||
serdeFeatures.enabled(SerdeFeature.UNWRAP_SINGLES) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd probably just pass all the features down. There could be other features later that also affect the schema.
Schema connectSchema = connectSrTranslator.toConnectSchema(schema); | ||
|
||
if (connectSchema.type() != Type.STRUCT) { | ||
if (connectSchema.type() != Type.STRUCT || unwrapSingle) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to check both. Just checking the serde features is enough. So I think this should become:
@Override
public List<SimpleColumn> toColumns(
final ParsedSchema schema,
final SerdeFeatures serdeFeatures,
final boolean isKey
) {
SerdeUtils.throwOnUnsupportedFeatures(serdeFeatures, format.supportedFeatures());
Schema connectSchema = connectSrTranslator.toConnectSchema(schema);
if (serdeFeatures.enabled(SerdeFeature.UNWRAP_SINGLES)) {
connectSchema = SerdeUtils.wrapSingle(connectSchema, isKey);
}
if (connectSchema.type() != Type.STRUCT) {
if (isKey) {
throw new IllegalStateException("Key schemas are always unwrapped.");
}
throw new KsqlException("Schema returned from schema registry is anonymous type. "
+ "To use this schema with ksqlDB, set '" + CommonCreateConfigs.WRAP_SINGLE_VALUE
+ "=false' in the WITH clause properties.");
}
final Schema rowSchema = connectKsqlTranslator.apply(connectSchema);
return rowSchema.fields().stream()
.map(ConnectFormatSchemaTranslator::toColumn)
.collect(Collectors.toList());
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much cleaner! Thanks for the suggestion.
Wanted to check that we're OK removing the exception that's thrown if an unwrapped schema is encountered for a format that does not support unwrapping (link) -- it makes sense to me that we're removing this since the only way we'd end up in this situation is if ksqlDB designated a format the supports unwrapping as not supporting it, but maybe it's better to be defensive and leave the check in?
@@ -122,6 +123,7 @@ public DefaultSchemaInjector(final TopicSchemaSupplier schemaSupplier) { | |||
props.getKafkaTopic(), | |||
props.getKeySchemaId(), | |||
keyFormat, | |||
SerdeFeatures.of(SerdeFeature.UNWRAP_SINGLES), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First off, you should only be setting this flag if the format supports unwrapping! Or more correctly, if the format supports both unwrapping and wrapping.
As to hard coding it... humm, yes, so far it's been hard-coded in one place: SerdeFeaturesFactory
.
We could do something like:
SerdeFeatures.of(SerdeFeature.UNWRAP_SINGLES), | |
SerdeFeaturesFactory.buildInternal(FormatFactory.of(keyFormat), |
But, to be honest, this is only a temporary thing. At the moment, we only support unwrapped keys. Once we support something else we'll wire it up and this hardcoding will go.
Merging in order to unblock the other PR behind this one -- will address anything that comes out of the open discussions in a follow-up PR. |
Description
Fixes #6444
The cause of the bug is that the DefaultSchemaInjector always unwraps struct schemas (assuming that they're wrapped), ignoring any serde features that should cause it to not do so (specifically,
wrap_single_value=false
). This PR fixes the bug by passing serde features into the schema translator, which is admittedly hacky and ugly but I couldn't think of a way around this.Testing done
QTT
Reviewer checklist