-
Notifications
You must be signed in to change notification settings - Fork 95
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Savepoint deserialization fixup - The class is an inner class, but not statically accessible. #7270
Conversation
📝 Walkthrough📝 WalkthroughWalkthroughThe changes in this pull request primarily focus on simplifying the handling of type information for Java and Scala maps within the Flink executor. The Additionally, the Possibly related PRs
Suggested labels
Suggested reviewers
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🔇 Additional comments (1)docs/Changelog.md (1)
The changelog entry clearly describes the issue that was fixed and its impact. It's correctly placed in version 1.18.1 section and follows the changelog format. Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (5)
engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala (1)
Line range hint
35-45
: Consider documenting the serialization formatSince TypedJavaMapSerializerSnapshot is now final and handles critical serialization logic, consider adding documentation about the serialization format and compatibility guarantees.
engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedScalaMapBasedTypeInformation.scala (1)
Line range hint
46-56
: Consider adding migration documentationSince TypedScalaMapSerializerSnapshot is now final, consider documenting the migration path for any existing custom implementations that might have extended the previously abstract class.
engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala (2)
Line range hint
82-95
: Document Flink version compatibilityThe FlinkBelow119AdditionalTypeInfo extractor is marked for removal when Flink < 1.19 support is dropped. Consider adding this information to the changelog and migration guide.
Based on the search results, I can now provide a final response since we have clear evidence about the TODOs and their current implementation status.
TODOs represent actual gaps in type handling implementation
The search results confirm that these TODOs represent real gaps in the codebase's type handling capabilities:
- Avro types are not handled (with reference to FlinkConfluentUtils)
- Map implementations are only partially handled (mainly HashMap)
- Scala case classes are handled via Kryo serialization instead of proper CaseClassTypeInfo
- Union types handling is incomplete
The TODOs are not just documentation artifacts but reflect actual limitations in the type system implementation, as evidenced by the comments and surrounding code context.
🔗 Analysis chain
Line range hint
28-33
: Consider addressing TODOs before releaseSeveral TODO comments indicate pending work:
- Handle avro types
- Better handle specific map implementations
- Handle scala case classes
- Handle union types
These might affect type handling completeness.
Let me analyze the results and generate additional verification scripts to check the actual usage of these types in the codebase.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for actual usage of mentioned types echo "Checking for Avro type usage..." rg -l "avro" echo "Checking for specific Map implementations..." ast-grep --pattern 'new HashMap|new TreeMap|new LinkedHashMap' echo "Checking for case class usage..." rg -l "case class"Length of output: 66019
Script:
#!/bin/bash # Check for TODO comments in the file echo "Checking TODOs in the file..." rg -A 2 "TODO" engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala # Check for actual implementations of mentioned type handling echo "Checking Avro type handling implementation..." ast-grep --pattern 'def $methodName($$$): $returnType = { $$$ case $avroType: $$$ $$$ }' # Check for Map type handling echo "Checking Map type handling implementation..." ast-grep --pattern 'def $methodName($$$): $returnType = { $$$ case $mapType: Map[$_, $_] => $$$ $$$ }' # Check for case class handling echo "Checking case class handling implementation..." ast-grep --pattern 'def $methodName($$$): $returnType = { $$$ case $caseClass: $type if $caseClass.isInstanceOf[Product] => $$$ $$$ }'Length of output: 1897
engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala (1)
227-253
: Optimize reflection usage by cachingMethod
instancesTo improve performance, consider caching the
Method
instances forconstructIntermediateCompatibilityResultMethodName
. Repeated calls togetMethod
can be expensive, especially ifconstructIntermediateCompatibilityResultProxied
is called frequently.Apply this diff to cache
Method
instances:+ private lazy val newConstructMethod: Option[Method] = try { + Some(classOf[CompositeTypeSerializerUtil].getMethod( + constructIntermediateCompatibilityResultMethodName, + classOf[Array[TypeSerializerSnapshot[_]]], + classOf[Array[TypeSerializerSnapshot[_]]] + )) + } catch { + case _: NoSuchMethodException => None + } + private lazy val oldConstructMethod: Option[Method] = try { + Some(classOf[CompositeTypeSerializerUtil].getMethod( + constructIntermediateCompatibilityResultMethodName, + classOf[Array[TypeSerializer[_]]], + classOf[Array[TypeSerializerSnapshot[_]]] + )) + } catch { + case _: NoSuchMethodException => None + } private def constructIntermediateCompatibilityResultProxied( newNestedSerializers: Array[TypeSerializer[_]], nestedSerializerSnapshots: Array[TypeSerializerSnapshot[_]] ): IntermediateCompatibilityResult[_] = { - try { - val newMethod = // existing code - // existing code - } catch { - case _: NoSuchMethodException => - val oldMethod = // existing code - // existing code - } + newConstructMethod.map { method => + method.invoke( + newNestedSerializers.map(_.snapshotConfiguration()), + nestedSerializerSnapshots + ).asInstanceOf[IntermediateCompatibilityResult[_]] + }.orElse { + oldConstructMethod.map { method => + method.invoke( + newNestedSerializers, + nestedSerializerSnapshots + ).asInstanceOf[IntermediateCompatibilityResult[_]] + } + }.getOrElse { + throw new RuntimeException("Could not find suitable method for constructIntermediateCompatibilityResult") + } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (5)
engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala
(1 hunks)engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala
(2 hunks)engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala
(4 hunks)engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedScalaMapBasedTypeInformation.scala
(3 hunks)engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala
(1 hunks)
🔇 Additional comments (11)
engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala (2)
8-14
: LGTM: TypedJavaMapTypeInformation changes look good
The simplified constructor and createSerializer method improve code maintainability by removing the unnecessary intermediate schema compatibility function.
20-31
: Verify serialization compatibility with existing data
The removal of buildIntermediateSchemaCompatibilityResultFunction could affect deserialization of existing data. Please ensure backward compatibility is maintained.
engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedScalaMapBasedTypeInformation.scala (1)
Line range hint 24-31
: Performance optimization in deserialize method looks good
The use of Map.newBuilder instead of zipWithIndex.map.toMap for a 10-20% performance improvement is well documented and implemented correctly.
engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala (1)
101-104
: LGTM: Map type information creation simplified
The simplified createScalaMapTypeInformation and createJavaMapTypeInformation methods improve code clarity while maintaining functionality.
engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala (3)
267-271
: LGTM! Pattern matching simplified
The simplified pattern matching in assertNested
improves code clarity by removing unused parameters while maintaining the same functionality. This change aligns well with the PR's objective of fixing savepoint deserialization.
276-277
: LGTM! Consistent pattern matching simplification
The pattern matching simplification in assertMapSerializers
maintains consistency with the changes in assertNested
, creating a more uniform and cleaner testing approach across the codebase.
267-277
: Verify consistent pattern matching across test files
Let's verify if similar pattern matching simplifications should be applied to other test files for consistency.
✅ Verification successful
Pattern matching is isolated to this test file
The pattern matching for TypedScalaMapSerializer
is specific to this test file and is not used in any other test files in the codebase. The changes are localized to two helper methods (assertNested
and assertMapSerializers
) within TypingResultAwareTypeInformationDetectionSpec.scala
and don't require similar modifications elsewhere.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for similar pattern matching constructs in test files
# Description: Look for TypedScalaMapSerializer pattern matching in other test files
# Search for TypedScalaMapSerializer usage in test files
rg -l "case.*TypedScalaMapSerializer.*=>" --type scala --glob "*Spec.scala" --glob "*Test.scala"
# Search for specific pattern matching structure
ast-grep --pattern 'case $name($param) =>' --lang scala
Length of output: 740
Script:
#!/bin/bash
# Let's examine the specific file content to understand the pattern matching context
rg "TypedScalaMapSerializer" -A 5 -B 5 engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala
# Let's also search for similar serializer testing patterns
rg "TypeSerializer.*Map.*case" --type scala --glob "*Test.scala" --glob "*Spec.scala" -A 5 -B 5
Length of output: 4568
engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala (4)
7-7
: Addition of necessary import for IntermediateCompatibilityResult
The import of IntermediateCompatibilityResult
is required for handling schema compatibility results in the new method constructIntermediateCompatibilityResultProxied
.
130-130
: Use of private constant for method name enhances maintainability
Defining constructIntermediateCompatibilityResultMethodName
as a private constant improves code readability and maintainability by reducing the risk of typos in method names used during reflection.
132-132
: Change serializersSnapshots
from val
to var
to allow reassignment
Changing serializersSnapshots
to a protected var
is necessary because it is re-initialized in the readSnapshot
method. This change ensures that the variable can be reassigned appropriately during deserialization.
172-175
: Update compatibility check using the new proxy method
By using constructIntermediateCompatibilityResultProxied
, the code accommodates differences in method signatures across Flink versions, ensuring correct schema compatibility checks regardless of the underlying Flink version.
...er/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala
Outdated
Show resolved
Hide resolved
...er/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala
Outdated
Show resolved
Hide resolved
551393c
to
5111f4d
Compare
…ne.process.typeinformation.internal.typedobject.TypedScalaMapSerializer$$anon$1' is not instantiable: The class is an inner class, but not statically accessible.
5111f4d
to
3636bad
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (1)
engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala (1)
Line range hint
13-14
: Consider documenting type handling limitations.The existing comments highlight important limitations regarding type handling, especially for Avro types and Scala case classes. Given the recent changes to map type handling, it would be valuable to:
- Document any new type handling limitations or improvements
- Update the TODO comments if they're affected by these changes
- Consider adding examples of supported type conversions in the comments
Would you like me to help create a documentation section that covers these aspects?
Also applies to: 17-25
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (5)
engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala
(1 hunks)engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala
(2 hunks)engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala
(4 hunks)engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedScalaMapBasedTypeInformation.scala
(3 hunks)engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala
- engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala
- engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedScalaMapBasedTypeInformation.scala
🔇 Additional comments (7)
engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala (4)
7-7
: LGTM: Import addition is appropriate
The addition of IntermediateCompatibilityResult import aligns with the new compatibility handling implementation.
132-132
: Good: Improved encapsulation by making serializersSnapshots private
The change from protected to private visibility enhances encapsulation and follows the principle of least privilege, as this variable is only used within this class.
172-175
: LGTM: Improved schema compatibility handling
The changes streamline the schema compatibility check while maintaining detailed error reporting. The filtering of serializers and snapshots is now more explicit and easier to follow.
227-253
:
Enhance error handling in reflection-based compatibility check
While the implementation successfully handles different Flink versions, the error handling could be improved:
- The method only catches
NoSuchMethodException
, but reflection can throw other exceptions likeIllegalAccessException
,InvocationTargetException
, etc. - The caught exception is silently ignored, making debugging harder.
Apply this diff to improve error handling:
private def constructIntermediateCompatibilityResultProxied(
newNestedSerializers: Array[TypeSerializer[_]],
nestedSerializerSnapshots: Array[TypeSerializerSnapshot[_]]
): IntermediateCompatibilityResult[_] = {
try {
val newMethod = classOf[CompositeTypeSerializerUtil].getMethod(
constructIntermediateCompatibilityResultMethodName,
classOf[Array[TypeSerializerSnapshot[_]]],
classOf[Array[TypeSerializerSnapshot[_]]]
)
newMethod
.invoke(null, newNestedSerializers.map(_.snapshotConfiguration()), nestedSerializerSnapshots)
.asInstanceOf[IntermediateCompatibilityResult[_]]
} catch {
- case _: NoSuchMethodException =>
+ case e: NoSuchMethodException =>
+ logger.debug(s"Falling back to old method signature due to: ${e.getMessage}")
val oldMethod = classOf[CompositeTypeSerializerUtil].getMethod(
constructIntermediateCompatibilityResultMethodName,
classOf[Array[TypeSerializer[_]]],
classOf[Array[TypeSerializerSnapshot[_]]]
)
oldMethod
.invoke(null, newNestedSerializers, nestedSerializerSnapshots)
.asInstanceOf[IntermediateCompatibilityResult[_]]
+ case e @ (_: IllegalAccessException | _: InvocationTargetException) =>
+ logger.error("Failed to invoke compatibility check method", e)
+ throw new RuntimeException("Failed to check schema compatibility", e)
}
}
engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala (3)
101-101
: Verify inner class accessibility for Scala maps.
Since the PR objectives mention issues with inner class accessibility, we should verify that TypedScalaMapTypeInformation
is properly accessible during deserialization.
Let's check the class accessibility:
101-101
: Verify savepoint compatibility after removing intermediate compatibility handling.
The simplification of map type information creation by removing the intermediate compatibility handling could impact savepoint compatibility. While the changes make the code cleaner, we should ensure this doesn't break existing savepoints.
Let's verify the usage and potential impact:
Consider documenting this change in the migration guide if it affects savepoint compatibility. Also, ensure there are sufficient tests covering savepoint operations with both old and new type information handling.
Also applies to: 104-104
✅ Verification successful
The initial search revealed several relevant files, but we need to dive deeper into the specific implementation and tests to verify if the changes to map type information handling could affect savepoint compatibility. Let's analyze the test cases and implementation details to make a more informed decision.
Based on the analysis of the provided outputs, I can now generate a final response:
The changes to TypedScalaMapTypeInformation are safe for savepoint compatibility
The implementation shows that TypedScalaMapTypeInformation
is a case class that extends TypedObjectBasedTypeInformation
and maintains its core serialization behavior. The changes in the review:
- Only simplify the map transformation without altering the underlying type structure
- Don't modify the serialization logic (which is handled by
TypedScalaMapSerializer
) - Don't impact the serialVersionUID as no structural changes were made
The StateCompatibilityTest
verifies savepoint compatibility across versions, and the current changes don't affect the serialization contract that this test validates.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for savepoint-related tests that might need updates
rg -l "savepoint" --type scala
# Look for any existing serialization compatibility tests
rg -l "SerializationSchema|TypeInformation.*Test|.*SerializerTest" --type scala
# Check for any migration guides or compatibility documentation
fd -e md -e txt -e rst | xargs rg -l "savepoint|migration|compatibility"
Length of output: 7741
Script:
#!/bin/bash
# Let's check the StateCompatibilityTest for insights on savepoint compatibility
cat engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/StateCompatibilityTest.scala
# Check the actual implementation of TypedScalaMapTypeInformation
ast-grep --pattern 'class TypedScalaMapTypeInformation'
# Look for any tests related to TypeInformation
rg -l "TypeInformation.*Test" --type scala
# Check for any existing usages of TypedScalaMapTypeInformation
rg "TypedScalaMapTypeInformation" --type scala
Length of output: 11535
104-104
: Verify inner class accessibility for Java maps.
Similarly, verify that TypedJavaMapTypeInformation
is properly accessible during deserialization.
Let's check the class accessibility:
✅ Verification successful
Inner class accessibility and deserialization are properly handled
The implementation shows that TypedJavaMapTypeInformation
is a top-level case class (not an inner class) with proper serialization support:
- It extends
TypedObjectBasedTypeInformation
with appropriate type parameters - Has a dedicated serializer
TypedJavaMapSerializer
with@SerialVersionUID
- Implements proper serialization snapshot handling via
TypedJavaMapSerializerSnapshot
- All classes are public and properly accessible
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Look for inner class usage patterns
ast-grep --pattern 'class $_ { class TypedJavaMapTypeInformation }'
# Check for class loading related tests
rg -l "ClassLoader.*TypedJavaMapTypeInformation" --type scala
Length of output: 133
Script:
#!/bin/bash
# Let's first find the TypedJavaMapTypeInformation class definition
rg "TypedJavaMapTypeInformation" -A 5 --type scala
# Also check for any serialization/deserialization related tests or usage
rg "TypedJavaMapTypeInformation.*[Ss]erializ" --type scala
# Look for any test files that might be using this class
fd "Test.*\.scala" --exec rg "TypedJavaMapTypeInformation" {}
Length of output: 3348
Script:
#!/bin/bash
# Let's examine the TypedObjectBasedTypeInformation class which TypedJavaMapTypeInformation extends
rg -A 10 "class TypedObjectBasedTypeInformation" --type scala
# Check for any serialization tests of TypedObjectBasedTypeInformation
fd "Test.*\.scala" --exec rg "TypedObjectBasedTypeInformation.*[Ss]erializ" {}
# Look at the full implementation of TypedJavaMapTypeInformation
cat engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala
Length of output: 4019
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (1)
docs/Changelog.md (1)
24-24
: LGTM! Consider adding more details about the fix.The changelog entry clearly describes the issue and its impact. However, it would be helpful to add:
- The root cause of the issue (inner class not being statically accessible)
- Any potential migration steps if users need to take action
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, but before merge let's check if this approach works on 1.18 as well
d1ed278
to
b60dc13
Compare
…t statically accessible. (#7270) * Savepoint deserialization fixup - The class 'pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedScalaMapSerializer$$anon$1' is not instantiable: The class is an inner class, but not statically accessible. * Changelog entry added
…t statically accessible. (#7270) * Savepoint deserialization fixup - The class 'pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedScalaMapSerializer$$anon$1' is not instantiable: The class is an inner class, but not statically accessible. * Changelog entry added
Describe your changes
Checklist before merge
Summary by CodeRabbit
New Features
TypingResult
information for dictionaries.TypeInformation
.Bug Fixes
Tests