From 9d4f3814cc78c57cd0e3ff972a14fd487ccb7b99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Cio=C5=82ecki?= Date: Tue, 26 Nov 2024 16:15:50 +0100 Subject: [PATCH 1/7] Fix: ToJsonEncoder keeps order fields during encoding map (#7237) --- docs/Changelog.md | 9 +++++---- .../engine/util/json/ToJsonEncoder.scala | 10 ++++------ .../engine/util/json/ToJsonEncoderSpec.scala | 16 ++++++++++++++++ 3 files changed, 25 insertions(+), 10 deletions(-) diff --git a/docs/Changelog.md b/docs/Changelog.md index a215a548ede..b323ce6b6e6 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -18,6 +18,11 @@ * Flink upgrade to 1.19.1. Note: it is possible to use Nussknacker with older versions of Flink, but it requires some extra steps. See [Migration guide](MigrationGuide.md) for details. * Performance optimisations of the serialisation of events passing through Flink's `DataStream`s. +### 1.18.1 (Not released yet) + +* [#7207](https://github.com/TouK/nussknacker/pull/7207) Fixed minor clipboard, keyboard and focus related bugs +* [#7237](https://github.com/TouK/nussknacker/pull/7237) Fix: ToJsonEncoder keeps order fields during encoding map + ### 1.18.0 (22 November 2024) * [#6944](https://github.com/TouK/nussknacker/pull/6944) [#7166](https://github.com/TouK/nussknacker/pull/7166) Changes around adhoc testing feature @@ -110,10 +115,6 @@ * [#7190](https://github.com/TouK/nussknacker/pull/7190) Fix "Failed to get node validation" when opening fragment node details for referencing non-existing fragment * [#7215](https://github.com/TouK/nussknacker/pull/7215) Change typing text to spinner during validation and provide delayed adding on enter until validation finishes in a scenario labels and fragment input -### 1.18.1 (Not released yet) - -* [#7207](https://github.com/TouK/nussknacker/pull/7207) Fixed minor clipboard, keyboard and focus related bugs - ## 1.17 #### Highlights diff --git a/utils/utils/src/main/scala/pl/touk/nussknacker/engine/util/json/ToJsonEncoder.scala b/utils/utils/src/main/scala/pl/touk/nussknacker/engine/util/json/ToJsonEncoder.scala index a57223fdd4c..649c777e593 100644 --- a/utils/utils/src/main/scala/pl/touk/nussknacker/engine/util/json/ToJsonEncoder.scala +++ b/utils/utils/src/main/scala/pl/touk/nussknacker/engine/util/json/ToJsonEncoder.scala @@ -7,10 +7,7 @@ import java.time.format.DateTimeFormatter import io.circe.{Encoder, Json} import io.circe.Json._ import pl.touk.nussknacker.engine.api.DisplayJson -import pl.touk.nussknacker.engine.util.Implicits._ - import java.util.ServiceLoader - import java.util.UUID import scala.jdk.CollectionConverters._ @@ -89,9 +86,10 @@ case class ToJsonEncoder( // toString on keys. private def encodeMap(map: Map[_, _]) = { val mapWithStringKeys = map.view.map { case (k, v) => - k.toString -> v - }.toMap - fromFields(mapWithStringKeys.mapValuesNow(encode)) + k.toString -> encode(v) + } + + fromFields(mapWithStringKeys) } } diff --git a/utils/utils/src/test/scala/pl/touk/nussknacker/engine/util/json/ToJsonEncoderSpec.scala b/utils/utils/src/test/scala/pl/touk/nussknacker/engine/util/json/ToJsonEncoderSpec.scala index 05322f742f2..3602f4bda0f 100644 --- a/utils/utils/src/test/scala/pl/touk/nussknacker/engine/util/json/ToJsonEncoderSpec.scala +++ b/utils/utils/src/test/scala/pl/touk/nussknacker/engine/util/json/ToJsonEncoderSpec.scala @@ -101,6 +101,22 @@ class ToJsonEncoderSpec extends AnyFunSpec with Matchers { } + it("should convert map to json and keep order of keys") { + val map = ListMap( + "intNumber" -> 42, + "floatNumber" -> 42.42, + "someTimestamp" -> 1496930555793L, + "someString" -> "hello", + "booleanValue" -> true + ) + + val expectedJson = + """{"intNumber":42,"floatNumber":42.42,"someTimestamp":1496930555793,"someString":"hello","booleanValue":true}""" + + // We compare string because we want to check the order + encoder.encode(map).noSpaces shouldBe expectedJson + } + } class CustomJsonEncoderCustomisation1 extends ToJsonEncoderCustomisation { From 648ab35d0dea90f07fb58de5fb125d61c3335fc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Kope=C4=87?= Date: Fri, 29 Nov 2024 12:13:25 +0100 Subject: [PATCH 2/7] Optional requiredParam for UIParameter decoder compatibility (#7265) --- .../pl/touk/nussknacker/restmodel/definition/package.scala | 6 ++++-- .../touk/nussknacker/ui/definition/DefinitionsService.scala | 2 +- docs-internal/api/nu-designer-openapi.yaml | 5 +++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/designer/restmodel/src/main/scala/pl/touk/nussknacker/restmodel/definition/package.scala b/designer/restmodel/src/main/scala/pl/touk/nussknacker/restmodel/definition/package.scala index f28942f62d6..11dcd56935f 100644 --- a/designer/restmodel/src/main/scala/pl/touk/nussknacker/restmodel/definition/package.scala +++ b/designer/restmodel/src/main/scala/pl/touk/nussknacker/restmodel/definition/package.scala @@ -50,8 +50,10 @@ package object definition { branchParam: Boolean, hintText: Option[String], label: String, - // This attribute is used only by external project - requiredParam: Boolean, + // This attribute is in use only by the external project and was introduced in the 1.18 version + // The option is for decoder backward compatibility to decode responses from older versions + // The option can be removed in future releases + requiredParam: Option[Boolean], ) @JsonCodec(encodeOnly = true) final case class UIComponentDefinition( diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/definition/DefinitionsService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/definition/DefinitionsService.scala index d9cd34c4017..1bb5980f09f 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/definition/DefinitionsService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/definition/DefinitionsService.scala @@ -170,7 +170,7 @@ object DefinitionsService { branchParam = parameter.branchParam, hintText = parameter.hintText, label = parameter.label, - requiredParam = !parameter.isOptional, + requiredParam = Some(!parameter.isOptional), ) } diff --git a/docs-internal/api/nu-designer-openapi.yaml b/docs-internal/api/nu-designer-openapi.yaml index 215692d8583..e98d3188696 100644 --- a/docs-internal/api/nu-designer-openapi.yaml +++ b/docs-internal/api/nu-designer-openapi.yaml @@ -7002,7 +7002,6 @@ components: - additionalVariables - branchParam - label - - requiredParam properties: name: type: string @@ -7036,7 +7035,9 @@ components: label: type: string requiredParam: - type: boolean + type: + - boolean + - 'null' UIValueParameterDto: title: UIValueParameterDto type: object From 6fed4c1898d0c4e2515975190f80dc2cf7fbf93b Mon Sep 17 00:00:00 2001 From: Maciej Cichanowicz <30436981+Elmacioro@users.noreply.github.com> Date: Fri, 29 Nov 2024 16:52:36 +0100 Subject: [PATCH 3/7] Fix race conditions failed to instantiate CompiledExpression exception (#7240) --- docs/Changelog.md | 1 + .../engine/spel/SpelExpression.scala | 20 ++++++-- .../engine/spel/SpelExpressionSpec.scala | 48 +++++++++++++++++++ 3 files changed, 66 insertions(+), 3 deletions(-) diff --git a/docs/Changelog.md b/docs/Changelog.md index b323ce6b6e6..01f95616103 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -22,6 +22,7 @@ * [#7207](https://github.com/TouK/nussknacker/pull/7207) Fixed minor clipboard, keyboard and focus related bugs * [#7237](https://github.com/TouK/nussknacker/pull/7237) Fix: ToJsonEncoder keeps order fields during encoding map +* [#7240](https://github.com/TouK/nussknacker/pull/7240) Fixed race condition problem during SpEL expression evaluation ### 1.18.0 (22 November 2024) diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/spel/SpelExpression.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/spel/SpelExpression.scala index a521545c97f..40fd86d1d1f 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/spel/SpelExpression.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/spel/SpelExpression.scala @@ -31,6 +31,7 @@ import pl.touk.nussknacker.engine.spel.SpelExpressionParseError.ExpressionCompil import pl.touk.nussknacker.engine.spel.SpelExpressionParser.Flavour import pl.touk.nussknacker.engine.spel.internal.EvaluationContextPreparer +import java.util.concurrent.atomic.AtomicBoolean import scala.util.control.NonFatal /** @@ -51,11 +52,24 @@ final case class ParsedSpelExpression( parser: () => ValidatedNel[ExpressionParseError, Expression], initial: Expression ) extends LazyLogging { - @volatile var parsed: Expression = initial + @volatile var parsed: Expression = initial + private val firstInterpretationFinished = new AtomicBoolean() def getValue[T](context: EvaluationContext, desiredResultType: Class[_]): T = { - def value(): T = parsed.getValue(context, desiredResultType).asInstanceOf[T] - + def value(): T = { + // There is a bug in Spring's SpelExpression class: interpretedCount variable is not synchronized with ReflectiveMethodExecutor.didArgumentConversionOccur. + // The latter mentioned method check argumentConversionOccurred Boolean which could be false not because conversion not occurred but because method.invoke() + // isn't finished yet. Due to this problem an expression that shouldn't be compiled might be compiled. It generates IllegalStateException errors in further evaluations of the expression. + if (!firstInterpretationFinished.get()) { + synchronized { + val valueToReturn = parsed.getValue(context, desiredResultType).asInstanceOf[T] + firstInterpretationFinished.set(true) + valueToReturn + } + } else { + parsed.getValue(context, desiredResultType).asInstanceOf[T] + } + } try { value() } catch { diff --git a/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/spel/SpelExpressionSpec.scala b/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/spel/SpelExpressionSpec.scala index 03a1c429cb2..d8ce5731bc1 100644 --- a/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/spel/SpelExpressionSpec.scala +++ b/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/spel/SpelExpressionSpec.scala @@ -11,6 +11,7 @@ import org.scalatest.OptionValues import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import org.scalatest.prop.TableDrivenPropertyChecks._ +import org.scalatest.time.SpanSugar.convertIntToGrainOfTime import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks import org.springframework.util.{NumberUtils, StringUtils} import pl.touk.nussknacker.engine.api.context.ValidationContext @@ -67,11 +68,14 @@ import java.nio.charset.{Charset, StandardCharsets} import java.time.chrono.{ChronoLocalDate, ChronoLocalDateTime} import java.time.{LocalDate, LocalDateTime, LocalTime, ZoneId, ZoneOffset} import java.util +import java.util.concurrent.Executors import java.util.{Collections, Currency, List => JList, Locale, Map => JMap, Optional, UUID} import scala.annotation.varargs +import scala.concurrent.{Await, ExecutionContext, Future, Promise} import scala.jdk.CollectionConverters._ import scala.language.implicitConversions import scala.reflect.runtime.universe._ +import scala.util.{Failure, Success} class SpelExpressionSpec extends AnyFunSuite with Matchers with ValidatedValuesDetailedMessage with OptionValues { @@ -2052,6 +2056,50 @@ class SpelExpressionSpec extends AnyFunSuite with Matchers with ValidatedValuesD parsed.evaluateSync[Any](customCtx) shouldBe 11 } + // This test is ignored as it was indeterministic and ugly, but it was used to verify race condition problems on + // ParsedSpelExpression.getValue. Without the synchronized block inside its method the test would fail the majority of times + ignore( + "should not throw 'Failed to instantiate CompiledExpression' when getValue is called on ParsedSpelExpression by multiple threads" + ) { + val spelExpression = + parse[LocalDateTime]("T(java.time.LocalDateTime).now().minusDays(14)", ctx).validValue.expression + .asInstanceOf[SpelExpression] + + val threadPool = Executors.newFixedThreadPool(1000) + implicit val customExecutionContext: ExecutionContext = ExecutionContext.fromExecutor(threadPool) + + // A promise to signal when an exception occurs + val failurePromise = Promise[Unit]() + + val tasks = (1 to 10000).map { _ => + Future { + try { + Thread.sleep(100) + // evaluate calls getValue on problematic SpelExpression object + spelExpression.evaluate[LocalDateTime](Context("fooId"), Map.empty) + } catch { + // The real problematic exception is wrapped in SpelExpressionEvaluationException by evaluate method + case e: SpelExpressionEvaluationException => + failurePromise.tryFailure(e.cause) + } + } + } + val firstFailureOrCompletion = Future.firstCompletedOf(Seq(Future.sequence(tasks), failurePromise.future)) + + firstFailureOrCompletion.onComplete { + case Success(_) => + println("All tasks completed successfully.") + threadPool.shutdown() + case Failure(e: IllegalStateException) if e.getMessage == "Failed to instantiate CompiledExpression" => + fail("Exception occurred due to race condition.", e) + threadPool.shutdown() + case Failure(e) => + fail("Unknown exception occurred", e) + threadPool.shutdown() + } + Await.result(firstFailureOrCompletion, 15.seconds) + } + } case class SampleObject(list: java.util.List[SampleValue]) From e6b9a687c2a1235f671fc64a409777c054c2cf44 Mon Sep 17 00:00:00 2001 From: JulianWielga Date: Mon, 2 Dec 2024 17:18:15 +0100 Subject: [PATCH 4/7] scroll view to cursor (#7269) --- .../editors/expression/AceWithSettings.tsx | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/designer/client/src/components/graph/node-modal/editors/expression/AceWithSettings.tsx b/designer/client/src/components/graph/node-modal/editors/expression/AceWithSettings.tsx index 374e97835df..51d7b19a96a 100644 --- a/designer/client/src/components/graph/node-modal/editors/expression/AceWithSettings.tsx +++ b/designer/client/src/components/graph/node-modal/editors/expression/AceWithSettings.tsx @@ -41,23 +41,19 @@ export default forwardRef(function AceWithSettings( const editor = editorRef.current?.editor; const selection = editor?.session.selection; - // before setting cursor position ensure all position calculations are actual - const prepare = () => editor?.renderer.updateFull(true); - const scrollToView = throttle( () => { - document.activeElement.scrollIntoView({ block: "nearest", inline: "nearest" }); + // before setting cursor position ensure all position calculations are actual + editor?.renderer.updateFull(true); + const activeElement = editor.container.querySelector(".ace_cursor") || document.activeElement; + activeElement.scrollIntoView({ block: "nearest", inline: "nearest" }); }, 150, { leading: false }, ); - editor?.addEventListener("mousedown", prepare); - editor?.addEventListener("mouseup", scrollToView); selection?.on("changeCursor", scrollToView); return () => { - editor?.removeEventListener("mousedown", prepare); - editor?.removeEventListener("mouseup", scrollToView); selection?.off("changeCursor", scrollToView); }; }, []); From 7a586a48c0403641d0b28094ee49625544b5b53f Mon Sep 17 00:00:00 2001 From: Arek Burdach Date: Mon, 2 Dec 2024 20:04:11 +0100 Subject: [PATCH 5/7] changelog entry added --- docs/Changelog.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/Changelog.md b/docs/Changelog.md index 01f95616103..8f928107a9c 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -23,6 +23,7 @@ * [#7207](https://github.com/TouK/nussknacker/pull/7207) Fixed minor clipboard, keyboard and focus related bugs * [#7237](https://github.com/TouK/nussknacker/pull/7237) Fix: ToJsonEncoder keeps order fields during encoding map * [#7240](https://github.com/TouK/nussknacker/pull/7240) Fixed race condition problem during SpEL expression evaluation +* [#7269](https://github.com/TouK/nussknacker/pull/7269) Fixed focus scrolling in expression editor ### 1.18.0 (22 November 2024) From 7642e4d1a89de49dc7fa402560ef19567c28951d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Solarski?= Date: Tue, 3 Dec 2024 14:49:26 +0100 Subject: [PATCH 6/7] Savepoint deserialization fixup - The class is an inner class, but not statically accessible. (#7270) (#7272) --- docs/Changelog.md | 1 + ...gResultAwareTypeInformationDetection.scala | 15 +----- .../TypedJavaMapBasedTypeInformation.scala | 20 +++---- .../TypedObjectBasedTypeInformation.scala | 53 ++++++++++++------- .../TypedScalaMapBasedTypeInformation.scala | 20 +++---- ...ultAwareTypeInformationDetectionSpec.scala | 17 +++--- 6 files changed, 57 insertions(+), 69 deletions(-) diff --git a/docs/Changelog.md b/docs/Changelog.md index 8f928107a9c..2e945a86e9e 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -24,6 +24,7 @@ * [#7237](https://github.com/TouK/nussknacker/pull/7237) Fix: ToJsonEncoder keeps order fields during encoding map * [#7240](https://github.com/TouK/nussknacker/pull/7240) Fixed race condition problem during SpEL expression evaluation * [#7269](https://github.com/TouK/nussknacker/pull/7269) Fixed focus scrolling in expression editor +* [#7270](https://github.com/TouK/nussknacker/pull/7270) Savepoint deserialization fixup - some taken savepoints (e.g. for scenarios with async enrichers) were not deserializable which led to errors during redeployments on Flink ### 1.18.0 (22 November 2024) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala index 131137fd1bd..7d89fb427cd 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala @@ -1,7 +1,6 @@ package pl.touk.nussknacker.engine.process.typeinformation import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} -import org.apache.flink.api.common.typeutils.{CompositeTypeSerializerUtil, TypeSerializer, TypeSerializerSnapshot} import org.apache.flink.api.java.typeutils.{ListTypeInfo, MapTypeInfo, MultisetTypeInfo, RowTypeInfo} import org.apache.flink.types.Row import pl.touk.nussknacker.engine.api.context.ValidationContext @@ -99,20 +98,10 @@ class TypingResultAwareTypeInformationDetection extends TypeInformationDetection } private def createScalaMapTypeInformation(typingResult: TypedObjectTypingResult) = - TypedScalaMapTypeInformation(typingResult.fields.mapValuesNow(forType), constructIntermediateCompatibilityResult) + TypedScalaMapTypeInformation(typingResult.fields.mapValuesNow(forType)) private def createJavaMapTypeInformation(typingResult: TypedObjectTypingResult) = - TypedJavaMapTypeInformation(typingResult.fields.mapValuesNow(forType), constructIntermediateCompatibilityResult) - - protected def constructIntermediateCompatibilityResult( - newNestedSerializers: Array[TypeSerializer[_]], - oldNestedSerializerSnapshots: Array[TypeSerializerSnapshot[_]] - ): CompositeTypeSerializerUtil.IntermediateCompatibilityResult[Nothing] = { - CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult( - newNestedSerializers.map(_.snapshotConfiguration()), - oldNestedSerializerSnapshots - ) - } + TypedJavaMapTypeInformation(typingResult.fields.mapValuesNow(forType)) def forValueWithContext[T]( validationContext: ValidationContext, diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala index c2f7fd96633..df03b2c5c15 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala @@ -3,42 +3,36 @@ package pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject import java.{util => jutil} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot} -import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedObjectBasedTypeInformation.BuildIntermediateSchemaCompatibilityResult case class TypedJavaMapTypeInformation( - informations: Map[String, TypeInformation[_]], - buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult + informations: Map[String, TypeInformation[_]] ) extends TypedObjectBasedTypeInformation[jutil.Map[String, AnyRef]](informations) { override def createSerializer( serializers: Array[(String, TypeSerializer[_])] ): TypeSerializer[jutil.Map[String, AnyRef]] = - TypedJavaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction) + TypedJavaMapSerializer(serializers) } @SerialVersionUID(1L) case class TypedJavaMapSerializer( - override val serializers: Array[(String, TypeSerializer[_])], - override val buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult + override val serializers: Array[(String, TypeSerializer[_])] ) extends TypedObjectBasedTypeSerializer[jutil.Map[String, AnyRef]](serializers) with BaseJavaMapBasedSerializer[AnyRef, jutil.Map[String, AnyRef]] { override def duplicate(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[jutil.Map[String, AnyRef]] = - TypedJavaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction) + TypedJavaMapSerializer(serializers) override def createInstance(): jutil.Map[String, AnyRef] = new jutil.HashMap() override def snapshotConfiguration( snapshots: Array[(String, TypeSerializerSnapshot[_])] - ): TypeSerializerSnapshot[jutil.Map[String, AnyRef]] = new TypedJavaMapSerializerSnapshot(snapshots) { - override val buildIntermediateSchemaCompatibilityResult: BuildIntermediateSchemaCompatibilityResult = - buildIntermediateSchemaCompatibilityResultFunction - } + ): TypeSerializerSnapshot[jutil.Map[String, AnyRef]] = new TypedJavaMapSerializerSnapshot(snapshots) } -abstract class TypedJavaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[jutil.Map[String, AnyRef]] { +final class TypedJavaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[jutil.Map[String, AnyRef]] { def this(serializers: Array[(String, TypeSerializerSnapshot[_])]) = { this() @@ -48,6 +42,6 @@ abstract class TypedJavaMapSerializerSnapshot extends TypedObjectBasedSerializer override protected def restoreSerializer( restored: Array[(String, TypeSerializer[_])] ): TypeSerializer[jutil.Map[String, AnyRef]] = - TypedJavaMapSerializer(restored, buildIntermediateSchemaCompatibilityResult) + TypedJavaMapSerializer(restored) } diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala index 530cba98149..e7ca4c1e5a8 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala @@ -4,6 +4,7 @@ import com.github.ghik.silencer.silent import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil.IntermediateCompatibilityResult import org.apache.flink.api.common.typeutils.{ CompositeTypeSerializerUtil, TypeSerializer, @@ -11,7 +12,6 @@ import org.apache.flink.api.common.typeutils.{ TypeSerializerSnapshot } import org.apache.flink.core.memory.{DataInputView, DataOutputView} -import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedObjectBasedTypeInformation.BuildIntermediateSchemaCompatibilityResult import scala.reflect.ClassTag @@ -57,15 +57,6 @@ abstract class TypedObjectBasedTypeInformation[T: ClassTag](informations: Array[ def createSerializer(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[T] } -object TypedObjectBasedTypeInformation { - - type BuildIntermediateSchemaCompatibilityResult = ( - Array[TypeSerializer[_]], - Array[TypeSerializerSnapshot[_]] - ) => CompositeTypeSerializerUtil.IntermediateCompatibilityResult[Nothing] - -} - //We use Array instead of List here, as we need access by index, which is faster for array abstract class TypedObjectBasedTypeSerializer[T](val serializers: Array[(String, TypeSerializer[_])]) extends TypeSerializer[T] @@ -132,17 +123,13 @@ abstract class TypedObjectBasedTypeSerializer[T](val serializers: Array[(String, def duplicate(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[T] - def buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult } abstract class TypedObjectBasedSerializerSnapshot[T] extends TypeSerializerSnapshot[T] with LazyLogging { - protected var serializersSnapshots: Array[(String, TypeSerializerSnapshot[_])] = _ + private val constructIntermediateCompatibilityResultMethodName = "constructIntermediateCompatibilityResult" - def this(serializers: Array[(String, TypeSerializerSnapshot[_])]) = { - this() - this.serializersSnapshots = serializers - } + protected var serializersSnapshots: Array[(String, TypeSerializerSnapshot[_])] = _ override def getCurrentVersion: Int = 1 @@ -182,10 +169,10 @@ abstract class TypedObjectBasedSerializerSnapshot[T] extends TypeSerializerSnaps val newKeys = newSerializers.map(_._1) val commons = currentKeys.intersect(newKeys) - val newSerializersToUse = newSerializers.filter(k => commons.contains(k._1)) - val snapshotsToUse = serializersSnapshots.filter(k => commons.contains(k._1)) + val newSerializersToUse: Array[(String, TypeSerializer[_])] = newSerializers.filter(k => commons.contains(k._1)) + val snapshotsToUse = serializersSnapshots.filter(k => commons.contains(k._1)) - val fieldsCompatibility = buildIntermediateSchemaCompatibilityResult( + val fieldsCompatibility = constructIntermediateCompatibilityResultProxied( newSerializersToUse.map(_._2), snapshotsToUse.map(_._2) ) @@ -237,7 +224,33 @@ abstract class TypedObjectBasedSerializerSnapshot[T] extends TypeSerializerSnaps } } - val buildIntermediateSchemaCompatibilityResult: BuildIntermediateSchemaCompatibilityResult + private def constructIntermediateCompatibilityResultProxied( + newNestedSerializers: Array[TypeSerializer[_]], + nestedSerializerSnapshots: Array[TypeSerializerSnapshot[_]] + ): IntermediateCompatibilityResult[_] = { + // signature of CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult has been changed between flink 1.18/1.19 + // Because of contract of serialization/deserialization of TypeSerializerSnapshot in can't be easily provided by TypeInformationDetection SPI mechanism + try { + val newMethod = classOf[CompositeTypeSerializerUtil].getMethod( + constructIntermediateCompatibilityResultMethodName, + classOf[Array[TypeSerializerSnapshot[_]]], + classOf[Array[TypeSerializerSnapshot[_]]] + ) + newMethod + .invoke(null, newNestedSerializers.map(_.snapshotConfiguration()), nestedSerializerSnapshots) + .asInstanceOf[IntermediateCompatibilityResult[_]] + } catch { + case _: NoSuchMethodException => + val oldMethod = classOf[CompositeTypeSerializerUtil].getMethod( + constructIntermediateCompatibilityResultMethodName, + classOf[Array[TypeSerializer[_]]], + classOf[Array[TypeSerializerSnapshot[_]]] + ) + oldMethod + .invoke(null, newNestedSerializers, nestedSerializerSnapshots) + .asInstanceOf[IntermediateCompatibilityResult[_]] + } + } override def restoreSerializer(): TypeSerializer[T] = restoreSerializer(serializersSnapshots.map { case (k, snapshot) => (k, snapshot.restoreSerializer()) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedScalaMapBasedTypeInformation.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedScalaMapBasedTypeInformation.scala index b987369b534..4b204c0418a 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedScalaMapBasedTypeInformation.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedScalaMapBasedTypeInformation.scala @@ -3,24 +3,21 @@ package pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot} -import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedObjectBasedTypeInformation.BuildIntermediateSchemaCompatibilityResult case class TypedScalaMapTypeInformation( - informations: Map[String, TypeInformation[_]], - buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult + informations: Map[String, TypeInformation[_]] ) extends TypedObjectBasedTypeInformation[Map[String, _ <: AnyRef]](informations) { override def createSerializer( serializers: Array[(String, TypeSerializer[_])] ): TypeSerializer[Map[String, _ <: AnyRef]] = - TypedScalaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction) + TypedScalaMapSerializer(serializers) } @SerialVersionUID(1L) case class TypedScalaMapSerializer( - override val serializers: Array[(String, TypeSerializer[_])], - override val buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult + override val serializers: Array[(String, TypeSerializer[_])] ) extends TypedObjectBasedTypeSerializer[Map[String, _ <: AnyRef]](serializers) with LazyLogging { @@ -36,20 +33,17 @@ case class TypedScalaMapSerializer( override def get(value: Map[String, _ <: AnyRef], k: String): AnyRef = value.getOrElse(k, null) override def duplicate(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[Map[String, _ <: AnyRef]] = - TypedScalaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction) + TypedScalaMapSerializer(serializers) override def createInstance(): Map[String, _ <: AnyRef] = Map.empty override def snapshotConfiguration( snapshots: Array[(String, TypeSerializerSnapshot[_])] - ): TypeSerializerSnapshot[Map[String, _ <: AnyRef]] = new TypedScalaMapSerializerSnapshot(snapshots) { - override val buildIntermediateSchemaCompatibilityResult: BuildIntermediateSchemaCompatibilityResult = - buildIntermediateSchemaCompatibilityResultFunction - } + ): TypeSerializerSnapshot[Map[String, _ <: AnyRef]] = new TypedScalaMapSerializerSnapshot(snapshots) } -abstract class TypedScalaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[Map[String, _ <: AnyRef]] { +final class TypedScalaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[Map[String, _ <: AnyRef]] { def this(serializers: Array[(String, TypeSerializerSnapshot[_])]) = { this() @@ -59,6 +53,6 @@ abstract class TypedScalaMapSerializerSnapshot extends TypedObjectBasedSerialize override protected def restoreSerializer( restored: Array[(String, TypeSerializer[_])] ): TypeSerializer[Map[String, _ <: AnyRef]] = - TypedScalaMapSerializer(restored, buildIntermediateSchemaCompatibilityResult) + TypedScalaMapSerializer(restored) } diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala index 8ed2795c1d6..f0b9b72f8c7 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala @@ -264,20 +264,17 @@ class TypingResultAwareTypeInformationDetectionSpec } private def assertNested(serializer: TypeSerializer[_], nested: (String, TypeSerializer[_] => Assertion)*): Unit = { - inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) { - case TypedScalaMapSerializer(array, _) => - array.zipAll(nested.toList, null, null).foreach { - case ((name, serializer), (expectedName, expectedSerializer)) => - name shouldBe expectedName - expectedSerializer(serializer) - } + inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) { case TypedScalaMapSerializer(array) => + array.zipAll(nested.toList, null, null).foreach { case ((name, serializer), (expectedName, expectedSerializer)) => + name shouldBe expectedName + expectedSerializer(serializer) + } } } private def assertMapSerializers(serializer: TypeSerializer[_], nested: (String, TypeSerializer[_])*) = { - inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) { - case TypedScalaMapSerializer(array, _) => - array.toList shouldBe nested.toList + inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) { case TypedScalaMapSerializer(array) => + array.toList shouldBe nested.toList } } From 0191821d53a82eb2954e286ab5311b72fea22429 Mon Sep 17 00:00:00 2001 From: Arek Burdach Date: Tue, 3 Dec 2024 21:44:53 +0100 Subject: [PATCH 7/7] Backport of memory settings fixups from https://github.com/TouK/nussknacker-quickstart/pull/197 and https://github.com/TouK/nussknacker-quickstart/pull/198/files to installation example --- docs/Changelog.md | 1 + examples/installation/docker-compose.yml | 4 ++-- examples/installation/flink/flink-properties.yml | 4 ++++ 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/docs/Changelog.md b/docs/Changelog.md index 2e945a86e9e..11d9f3e4c79 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -25,6 +25,7 @@ * [#7240](https://github.com/TouK/nussknacker/pull/7240) Fixed race condition problem during SpEL expression evaluation * [#7269](https://github.com/TouK/nussknacker/pull/7269) Fixed focus scrolling in expression editor * [#7270](https://github.com/TouK/nussknacker/pull/7270) Savepoint deserialization fixup - some taken savepoints (e.g. for scenarios with async enrichers) were not deserializable which led to errors during redeployments on Flink +* [#7279](https://github.com/TouK/nussknacker/pull/7279) Fixed Flink TaskManager and Designer containers restarts in installation example ### 1.18.0 (22 November 2024) diff --git a/examples/installation/docker-compose.yml b/examples/installation/docker-compose.yml index a8e977f117a..8bc5c3e84ad 100644 --- a/examples/installation/docker-compose.yml +++ b/examples/installation/docker-compose.yml @@ -40,7 +40,7 @@ services: SCHEMA_REGISTRY_URL: "http://schema-registry:8081" INFLUXDB_URL: "http://influxdb:8086" FLINK_REST_URL: "http://flink-jobmanager:8081" - JDK_JAVA_OPTIONS: "-Xmx1024M" + JDK_JAVA_OPTIONS: "-Xmx400M -XX:MaxMetaspaceSize=300M -XX:MaxDirectMemorySize=100M" USAGE_REPORTS_SOURCE: "example-installation-docker-compose" depends_on: postgres: @@ -241,7 +241,7 @@ services: deploy: resources: limits: - memory: 1024M + memory: 1500M telegraf: image: telegraf:1.30.2 diff --git a/examples/installation/flink/flink-properties.yml b/examples/installation/flink/flink-properties.yml index dd0b7fed688..c64ec68ca3e 100644 --- a/examples/installation/flink/flink-properties.yml +++ b/examples/installation/flink/flink-properties.yml @@ -1,4 +1,8 @@ taskmanager.numberOfTaskSlots: 8 +# Nu requires a little bit more metaspace than Flink default allocate based on process size +taskmanager.memory.process.size: 1500m +taskmanager.memory.jvm-metaspace.size: 400m + state.backend.type: filesystem state.checkpoints.dir: file:///opt/flink/data/checkpoints state.savepoints.dir: file:///opt/flink/data/savepoints