Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race in FlinkTypeInfoRegistrar #7733

Merged
merged 1 commit into from
Mar 24, 2025
Merged

Conversation

piotrp
Copy link
Member

@piotrp piotrp commented Mar 23, 2025

Describe your changes

Fixes race found on customer's environment with two Flink deployment managers:

20:57:07.486 [nussknacker-designer-pekko.actor.default-dispatcher-7] ERROR p.t.n.ui.api.NuDesignerErrorToHttp$ - Unknown error: A TypeInfoFactory for type 'class java.time.LocalTime' is already registered.
org.apache.flink.api.common.functions.InvalidTypesException: A TypeInfoFactory for type 'class java.time.LocalTime' is already registered.
        at org.apache.flink.api.java.typeutils.TypeExtractor.registerFactory(TypeExtractor.java:164)
        at pl.touk.nussknacker.engine.flink.api.typeinformation.FlinkTypeInfoRegistrar$.register(FlinkTypeInfoRegistrar.scala:45)
        at pl.touk.nussknacker.engine.flink.api.typeinformation.FlinkTypeInfoRegistrar$.$anonfun$ensureTypeInfosAreRegistered$1(FlinkTypeInfoRegistrar.scala:33)
        at pl.touk.nussknacker.engine.flink.api.typeinformation.FlinkTypeInfoRegistrar$.$anonfun$ensureTypeInfosAreRegistered$1$adapted(FlinkTypeInfoRegistrar.scala:32)
        at scala.collection.immutable.List.foreach(List.scala:334)
        at pl.touk.nussknacker.engine.flink.api.typeinformation.FlinkTypeInfoRegistrar$.ensureTypeInfosAreRegistered(FlinkTypeInfoRegistrar.scala:32)
        at pl.touk.nussknacker.engine.process.ExecutionConfigPreparer$SerializationPreparer.prepareExecutionConfig(ExecutionConfigPreparer.scala:91)
        at pl.touk.nussknacker.engine.process.ExecutionConfigPreparer$$anon$1.$anonfun$prepareExecutionConfig$1(ExecutionConfigPreparer.scala:45)
        at pl.touk.nussknacker.engine.process.ExecutionConfigPreparer$$anon$1.$anonfun$prepareExecutionConfig$1$adapted(ExecutionConfigPreparer.scala:45)
        at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619)
        at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:935)
        at pl.touk.nussknacker.engine.process.ExecutionConfigPreparer$$anon$1.prepareExecutionConfig(ExecutionConfigPreparer.scala:45)
        at pl.touk.nussknacker.engine.process.registrar.DefaultStreamExecutionEnvPreparer.preRegistration(StreamExecutionEnvPreparer.scala:61)
        at customer.namespace.EspFlinkCompatibilityProvider$$anon$1.preRegistration(EspFlinkCompatibilityProvider.scala:26)
        at pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar.$anonfun$register$1(FlinkProcessRegistrar.scala:83)
        at pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar.$anonfun$register$1$adapted(FlinkProcessRegistrar.scala:78)
        at pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar.$anonfun$usingRightClassloader$1(FlinkProcessRegistrar.scala:106)
        at apply @ pl.touk.nussknacker.engine.flink.minicluster.scenariotesting.FlinkMiniClusterScenarioTestRunner.$anonfun$runTests$6(FlinkMiniClusterScenarioTestRunner.scala:68)
        at apply$extension @ pl.touk.nussknacker.engine.flink.minicluster.scenariotesting.FlinkMiniClusterScenarioTestRunner.$anonfun$runTests$6(FlinkMiniClusterScenarioTestRunner.scala:68)
        at flatMap @ pl.touk.nussknacker.engine.flink.minicluster.scenariotesting.FlinkMiniClusterScenarioTestRunner.$anonfun$runTests$6(FlinkMiniClusterScenarioTestRunner.scala:68)
        at apply @ pl.touk.nussknacker.engine.testmode.ResultsCollectingListenerHolder$.registerTestEngineListener(ResultsCollectingListener.scala:110)
        at main$ @ pl.touk.nussknacker.ui.NussknackerApp$.main(NussknackerApp.scala:8)
        at main$ @ pl.touk.nussknacker.ui.NussknackerApp$.main(NussknackerApp.scala:8)
        at main$ @ pl.touk.nussknacker.ui.NussknackerApp$.main(NussknackerApp.scala:8)

Checklist before merge

  • Related issue ID is placed at the beginning of PR title in [brackets] (can be GH issue or Nu Jira issue)
  • Code is cleaned from temporary changes and commented out lines
  • Parts of the code that are not easy to understand are documented in the code
  • Changes are covered by automated tests
  • Showcase in dev-application.conf added to demonstrate the feature
  • Documentation added or updated
  • Added entry in Changelog.md describing the change from the perspective of a public distribution user
  • Added MigrationGuide.md entry in the appropriate subcategory if introducing a breaking change
  • Verify that PR will be squashed during merge

Sorry, something went wrong.

@piotrp piotrp force-pushed the fix-FlinkTypeInfoRegistrar-race branch from 7f500f1 to 25c52fe Compare March 24, 2025 16:03
@piotrp piotrp merged commit 2314ca7 into staging Mar 24, 2025
18 checks passed
@piotrp piotrp deleted the fix-FlinkTypeInfoRegistrar-race branch March 24, 2025 19:04
piotrp added a commit that referenced this pull request Mar 24, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants