Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClassTag is provided in params in deserialization schema factory … #1499

Merged

Conversation

gskrobisz
Copy link
Member

ClassTag is provided in params so one deserialization schema factory will work for multiple deserialization schemas

@gskrobisz gskrobisz linked an issue Mar 31, 2021 that may be closed by this pull request
Copy link
Member

@arkadius arkadius left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good change! It need some slight modifications but looks promising.

val valueSchemaData = valueSchemaDeterminer.determineSchemaUsedInTyping.valueOr(SchemaDeterminerErrorHandler.handleSchemaRegistryErrorAndThrowException)
val valueSchemaUsedInRuntime = valueSchemaDeterminer.toRuntimeSchema(valueSchemaData)
// key schema, optional
val keySchemaDataUsedInRuntime = Option(keySchemaDeterminer).flatMap(determiner => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Use flatMap { determiner => ... instead of flatMap(determiner => { to reduce number of braces.
  2. if you see nested flatMap/map inside flatMap you can use for-comprehension it will be more straightforward
  3. Don't handle error as a None. Let's handle it the same way as it is handled in value approach. On the top level (KafkaAvroBaseTransformer maybe?) we should define determiner that should not even ask schema registry for key schema. It should be done only in tests of TupleAvroKeyValueKafkaAvroDeserializerSchemaFactory

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. "Empty" Determiner is provided as FixedNoneSchemaDeterminer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

passing keySchemaDeterminer is ok, but I don't like this getOrElse. Why you pass determiner if you override it's decision? Why not to just throw exception here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. There is one schema determiner used in both contextTransformation and implementation. See BasedOnVersionWithFallbackAvroSchemaDeterminer

valueSchemaDeterminer: AvroSchemaDeterminer,
keySchemaDeterminer: AvroSchemaDeterminer,
returnGenericAvroType: Boolean,
valueClassTagOpt: Option[ClassTag[_]] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of optional parameters in such a low level api. IMO it is only good for some domain level classes where you want to have some fancy factory methods combinations or in tests. In production code it is easy to miss argument in some invocation where it should be passed.

Copy link
Member Author

@gskrobisz gskrobisz Apr 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Optional ClassTag parameters are removed (replaced by [K: ClassTag, V: ClassTag])


object KafkaAvroKeyValueDeserializationSchemaFactory {

val fallbackKeyAsStringDeserializer: Deserializer[String] = new Deserializer[String] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. this deserializer is not a fallbck deserializer. It just derialize to string. It is used as a fallback so "fallback" word could be placed in the place of usage.
  2. lest create named class StringDeserializer - it wil help in some debugging
  3. why you haven't used existing org.apache.kafka.common.serialization.StringDeserializer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Object is removed, fallback scenario should be implemented in a custom deerializer. Here, for value-only deserializers keys are ignored, and for key-value deserializers specific schema should be provided (default scenario).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still see not used fallbackKeyAsStringDeserializer value, why?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, removed unused object.

schemaRegistryProvider.deserializationSchemaFactory,
schemaRegistryProvider.recordFormatter,
valueSchemaDeterminer,
keySchemaDeterminer = null,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No no no, we are not using null in scala! Always use Option instead... But in this case IMO simpler would be to create case object FixedNoneSchemaDeterminer. WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Created FixedNoneSchemaDeterminer.

@@ -34,8 +34,8 @@ class ConfluentKafkaAvroDeserializer[T](kafkaConfig: KafkaConfig, schemaData: Ru
KryoGenericRecordSchemaIdSerializationSupport.schemaIdSerializationEnabled(kafkaConfig)

override def deserialize(topic: String, data: Array[Byte]): T = {
val record = deserialize(topic, isKey, data, schemaData)
record.asInstanceOf[T]
val payloadObj = deserialize(topic, isKey, data, schemaData)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not always kafka payload. It can be used also for isKey = true.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neither it is a "record", which is somewhat misleading. Changed name to neutral deserializedData (to match parameter data).

Copy link
Contributor

@lciolecki lciolecki left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder about that can we right now choose key schema version on source / sink?

* use other deserialization strategy base on it or provide different TypeInformation
* @return KafkaDeserializationSchema
*/
def create[K: ClassTag, V: ClassTag](kafkaConfig: KafkaConfig,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I don't like this formatting :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@@ -34,3 +35,8 @@ object RuntimeSchemaData {
}

class SchemaDeterminerError(message: String, cause: Throwable) extends RuntimeException(message, cause)

case object FixedNoneSchemaDeterminer extends AvroSchemaDeterminer {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we don't need FixedNoneSchemaDeterminer but FixedStringSchemaDeterminer, because at the end this schema will be used to determine type of key. Also I would add here a description that this determiner is because we want to have one generic source that will handle both situations: key with defined schema in registry and key as fixed string type.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to FixedStringSchemaDeterminer (have not removed, it's used in Specific source)

@gskrobisz gskrobisz force-pushed the improvements/kafka_avro_kv_deserializer_classTag_param branch from 1c4f8e9 to 2985580 Compare April 13, 2021 13:01
@gskrobisz gskrobisz force-pushed the improvements/kafka_avro_kv_deserializer_classTag_param branch from 2985580 to 2c87706 Compare April 13, 2021 13:04
Copy link
Member

@arkadius arkadius left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job, let's merge it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Source should be able to control start variables
3 participants