Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kafka source with additional variable with metadata. #1512

Merged
merged 10 commits into from
May 18, 2021

Conversation

gskrobisz
Copy link
Member

No description provided.

@gskrobisz gskrobisz linked an issue Apr 8, 2021 that may be closed by this pull request
@gskrobisz gskrobisz force-pushed the feature/kafka_source_with_metadata branch 3 times, most recently from 997596f to 444c8b4 Compare April 12, 2021 13:32
@gskrobisz gskrobisz changed the base branch from improvements/flinksource_api_refactoring to staging April 12, 2021 13:33
@gskrobisz gskrobisz changed the title Add kafka source that allows additional variable with metadata. Add kafka source with additional variable with metadata. Apr 12, 2021
TypedObjectTypingResult(fields(keyTypingResult), objType(keyTypingResult))
}

private def fields(keyTypingResult: typing.TypingResult): Map[String, TypingResult] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we could just introduce case class here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have InputMeta case class. The problem here is that we mix type of key with primitive fields. IMO this way is ok, to resolve this problem. This companion object is in the same file as class so probability of desynchornization is minimal.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (and introduced case class SerializableConsumerRecord)

@gskrobisz gskrobisz force-pushed the feature/kafka_source_with_metadata branch 3 times, most recently from eca25b1 to 8a024a9 Compare May 5, 2021 11:49

def from[K, V](topic: String, record: SerializableConsumerRecord[K, V]): ConsumerRecord[K, V] = {
createConsumerRecord(
record.topic.getOrElse(topic),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is counter-intuitive that topic specified in record has higher priority than record.topic. WHy this method is needed? Why not to just serializableConsumerRecord.copy(topic = Some(otherTopic)).toConsumerRecord)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I see now that this method has cheated me - take a look for full invocation: looking on SerializableConsumerRecord from (....) you will expect that the result will be SerializableConsumerRecord, not kafka ConsumerRecord

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed according to our discussion.


object SerializableConsumerRecord {

def createConsumerRecord[K, V](topic: String, partition: Int, offset: Long, timestamp: Long, key: K, value: V, headers: Headers): ConsumerRecord[K, V] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I right that it is only used on data extracted from SerializableConsumerRecord? If it so, why not too have just a method toKafkaConsumerRecord inside SerializableConsumerRecord.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, according to our discussion.

@gskrobisz gskrobisz force-pushed the feature/kafka_source_with_metadata branch 2 times, most recently from 4e830f2 to 787efed Compare May 11, 2021 15:03
- current `RecordFormater` should be sufficient for value-only serialization, or use `ConsumerRecordToJsonFormatter` for metadata serialization
- provide timestampAssigner that is able to extract time from `ConsumerRecord[K, V]`
Also:
- removed `BaseKafkaSourceFactory` with multiple topics support: use `KafkaSourceFactory` instead, see test "source with two input topics"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you fix the order? should it be see "source with two input topics" test case

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@gskrobisz gskrobisz force-pushed the feature/kafka_source_with_metadata branch from a449a8e to f19eb86 Compare May 14, 2021 11:47
Copy link
Member

@arkadius arkadius left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, have some comments to type info detection because I didn't looked into it so far, maybe @mproch also take a look on it

/**
* Trait that allows for providing more details TypeInformation when TypingResult is known.
*/
trait TypeInformationDetectionForTypingResult extends TypeInformationDetection {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Why not to merge it with TypeInformationDetection? 2. Maybe it could have better name because now it looks like it only detect type for typing result (fot context and so on - no)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 If it extends TypeInformationDetection I think it's better to move this method to forType?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Removed TypeInformationDetectionForTypingResult, method forType moved to TypeInformationDetection.

/**
* Customisation for TypeInformationDetection that provides type information for BaseInputMeta.
*/
class InputMetaAwareTypeInformationCustomisation extends TypingResultAwareTypeInformationCustomisation {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be used in generic/demo/dev model?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Currently it does not seem to be used anywhere? At least we should register it with ServiceLoader?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. See META-INF/services in flinkKafkaUtil and process. Example test scenario is in InputMetaDeserializationSpec.

* @tparam K - type of key of deserialized ConsumerRecord
* @tparam V - type of value of deserialized ConsumerRecord
*/
@silent("deprecated")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what deprecation warnings are here? Can we do sth to avoid them?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. ConsumerRecord marks field checksum as deprecated. We do not need to use this field, therefore here I replaced it with default NULL_CHECKSUM value.

/**
* Trait that allows for providing more details TypeInformation when TypingResult is known.
*/
trait TypeInformationDetectionForTypingResult extends TypeInformationDetection {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 If it extends TypeInformationDetection I think it's better to move this method to forType?

/**
* Customisation for TypeInformationDetection that provides type information for BaseInputMeta.
*/
class InputMetaAwareTypeInformationCustomisation extends TypingResultAwareTypeInformationCustomisation {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Currently it does not seem to be used anywhere? At least we should register it with ServiceLoader?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Source should be able to control start variables
3 participants