Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an Avro version of the airline.skyone and airline.sunset Kafka topics. #463

Closed
j3-signalroom opened this issue Nov 17, 2024 · 9 comments · Fixed by #464
Closed

Add an Avro version of the airline.skyone and airline.sunset Kafka topics. #463

j3-signalroom opened this issue Nov 17, 2024 · 9 comments · Fixed by #464
Assignees
Labels
enhancement New feature or request

Comments

@j3-signalroom
Copy link
Owner

No description provided.

@j3-signalroom j3-signalroom added the enhancement New feature or request label Nov 17, 2024
@j3-signalroom j3-signalroom self-assigned this Nov 17, 2024
@j3-signalroom j3-signalroom moved this to In Progress in J3 Public Projects Nov 17, 2024
j3-signalroom added a commit that referenced this issue Nov 17, 2024
@j3-signalroom j3-signalroom changed the title Use Avro instead of JSON serializer for the airline.skyone and airline.sunset Kafka topics. Add an Avro version of the airline.skyone and airline.sunset Kafka topics. Nov 17, 2024
@j3-signalroom
Copy link
Owner Author

The Avro version of the Kafka topics: airline.skyone_avro and airline.sunset_avro.

@j3-signalroom
Copy link
Owner Author

2024-11-17 14:46:31
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:311)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:74)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collectAndCheckIfChained(ChainingOutput.java:90)
	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collectAndCheckIfChained(ChainingOutput.java:40)
	at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:83)
	at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:34)
	at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310)
	at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
	at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
	at org.apache.flink.api.connector.source.lib.util.IteratorSourceReaderBase.pollNext(IteratorSourceReaderBase.java:111)
	at org.apache.flink.api.connector.source.util.ratelimit.RateLimitedSourceReader.pollNext(RateLimitedSourceReader.java:69)
	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422)
	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.UnsupportedOperationException
	at java.base/java.util.Collections$UnmodifiableCollection.add(Collections.java:1067)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	... 35 more

j3-signalroom added a commit that referenced this issue Nov 17, 2024
@j3-signalroom
Copy link
Owner Author

The exception you’re encountering, com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException, during serialization in a Flink job using Kryo serialization is likely due to Kryo attempting to serialize a read-only or immutable collection (such as those returned by Collections.unmodifiableList or similar) or a complex object structure it cannot handle properly.

Possible Causes and Solutions:

  1. Immutable Collections:

    • The error trace shows java.util.Collections$UnmodifiableCollection.add, suggesting Kryo is trying to deserialize a collection that was made immutable using methods like Collections.unmodifiableList().
    • Solution: Consider using mutable collections instead or configuring Kryo to use a custom serializer that can handle immutable collections.
  2. Unsupported Avro Object Serialization:

    • The exception stack trace also points to Avro schema fields (org.apache.avro.Schema$Field). Kryo may be having difficulty serializing complex Avro schemas or types.
    • Solution: Consider using Flink's built-in serializers (e.g., AvroSerializer) for Avro objects instead of Kryo. You can do this by registering the type with Flink’s TypeInformation system or using Avro-specific serialization libraries.
  3. Custom Kryo Serializer:

    • You may need to create and register a custom Kryo serializer for the problematic data types.
    • Example:
      env.getConfig().addDefaultKryoSerializer(MyClass.class, new MyCustomSerializer());
    • This approach can help if you have specific needs for serializing and deserializing your objects.
  4. Flink Configuration for Serialization:

    • If the objects causing issues are complex, you can try configuring the serialization to use Java serialization instead of Kryo, or explicitly register types with Kryo for better control.
    • Example of switching serialization:
      env.getConfig().enableGenericTypes();
    • However, this may come with a performance trade-off.
  5. Avoiding Complex Structures in State:

    • Consider simplifying data structures passed through stateful operations if feasible. Nested or complex objects can lead to serialization challenges, especially if third-party objects (e.g., Avro-generated classes) do not support efficient serialization.

Immediate Steps:

  1. Investigate and Modify Data Structure: Check your data structure, especially fields like org.apache.avro.Schema$Field, to see if they contain immutable collections or unsupported complex objects.
  2. Switch Serializer: Use a serializer compatible with the data structure (e.g., Avro-specific serializer for Avro objects).
  3. Custom Serializer: Create and register custom serializers if you need to work with objects that have complex serialization requirements.

@j3-signalroom
Copy link
Owner Author

The code should not use Kryo serialization. The data should be serialized in Avro.

@j3-signalroom j3-signalroom modified the milestone: Release 1.00.00.000 Nov 17, 2024
@j3-signalroom
Copy link
Owner Author

j3-signalroom commented Nov 17, 2024

Deprecated. Register data types and serializers through hard codes is deprecated, because you need to modify the codes when upgrading job version. You should configure this by config option PipelineOptions.SERIALIZATION_CONFIG.

Registers the given type with a Kryo Serializer.

@j3-signalroom
Copy link
Owner Author

Use

PipelineOptions.SERIALIZATION_CONFIG

@j3-signalroom
Copy link
Owner Author

@j3-signalroom j3-signalroom linked a pull request Nov 18, 2024 that will close this issue
@github-project-automation github-project-automation bot moved this from In Progress to Done in J3 Public Projects Nov 18, 2024
@j3-signalroom j3-signalroom reopened this Nov 18, 2024
@j3-signalroom
Copy link
Owner Author

See Issue #469

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Development

Successfully merging a pull request may close this issue.

1 participant