From 9f266e42c1208ee76ac9e43eb9c7b8faa9ec731f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Bigorajski?= Date: Tue, 25 Feb 2025 15:03:57 +0100 Subject: [PATCH] fixes --- .../api/component/NodesDeploymentData.scala | 3 +-- .../engine/api/process/ComponentUseContext.scala | 16 +++++++++------- .../openapi/functional/OpenAPIServiceSpec.scala | 2 +- .../nussknacker/openapi/BaseOpenAPITest.scala | 9 ++++----- .../engine/flink/table/source/TableSource.scala | 3 +-- .../registrar/FlinkProcessRegistrar.scala | 2 +- .../functional/GenericTransformationSpec.scala | 5 ++--- .../engine/process/functional/ProcessSpec.scala | 4 ++-- .../process/helpers/ProcessTestHelpers.scala | 2 +- .../runner/SimpleProcessConfigCreator.scala | 8 ++++---- .../engine/process/helpers/SampleNodes.scala | 6 +++--- .../nussknacker/engine/ComponentUseCase.scala | 2 +- .../pl/touk/nussknacker/engine/Interpreter.scala | 2 +- .../compile/nodecompilation/NodeCompiler.scala | 2 +- .../MethodBasedServiceInvokerTest.scala | 2 +- 15 files changed, 33 insertions(+), 35 deletions(-) diff --git a/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/NodesDeploymentData.scala b/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/NodesDeploymentData.scala index d197c7a111d..5951b1a96f2 100644 --- a/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/NodesDeploymentData.scala +++ b/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/NodesDeploymentData.scala @@ -4,8 +4,7 @@ import io.circe.{Decoder, Encoder} import pl.touk.nussknacker.engine.api.NodeId import pl.touk.nussknacker.engine.api.component.NodesDeploymentData.NodeDeploymentData -// todo make private -final case class NodesDeploymentData(dataByNodeId: Map[NodeId, NodeDeploymentData]) { +final case class NodesDeploymentData(private val dataByNodeId: Map[NodeId, NodeDeploymentData]) { def get(nodeId: NodeId): Option[NodeDeploymentData] = dataByNodeId.get(nodeId) } diff --git a/components-api/src/main/scala/pl/touk/nussknacker/engine/api/process/ComponentUseContext.scala b/components-api/src/main/scala/pl/touk/nussknacker/engine/api/process/ComponentUseContext.scala index 6f4c1399207..3e867974573 100644 --- a/components-api/src/main/scala/pl/touk/nussknacker/engine/api/process/ComponentUseContext.scala +++ b/components-api/src/main/scala/pl/touk/nussknacker/engine/api/process/ComponentUseContext.scala @@ -14,17 +14,19 @@ import pl.touk.nussknacker.engine.api.process.ComponentUseContext.EngineRuntime * */ sealed trait ComponentUseContext { + def deploymentData(): Option[NodeDeploymentData] = this match { - case EngineRuntime(nodeData) => Some(nodeData) - case _ => None + case EngineRuntime(nodeData) => nodeData + case _ => None } + } object ComponentUseContext { - case class EngineRuntime(nodeData: NodeDeploymentData) extends ComponentUseContext - case object TestRuntime extends ComponentUseContext - case object Validation extends ComponentUseContext - case object ServiceQuery extends ComponentUseContext - case object TestDataGeneration extends ComponentUseContext + case class EngineRuntime(nodeData: Option[NodeDeploymentData]) extends ComponentUseContext + case object TestRuntime extends ComponentUseContext + case object Validation extends ComponentUseContext + case object ServiceQuery extends ComponentUseContext + case object TestDataGeneration extends ComponentUseContext } diff --git a/components/openapi/src/it/scala/pl/touk/nussknacker/openapi/functional/OpenAPIServiceSpec.scala b/components/openapi/src/it/scala/pl/touk/nussknacker/openapi/functional/OpenAPIServiceSpec.scala index 458c8866b99..5b108dbc6d1 100644 --- a/components/openapi/src/it/scala/pl/touk/nussknacker/openapi/functional/OpenAPIServiceSpec.scala +++ b/components/openapi/src/it/scala/pl/touk/nussknacker/openapi/functional/OpenAPIServiceSpec.scala @@ -31,7 +31,7 @@ class OpenAPIServiceSpec with LazyLogging with PatientScalaFutures { - implicit val componentUseCase: ComponentUseContext = ComponentUseContext.EngineRuntime(Map.empty) + implicit val componentUseCase: ComponentUseContext = ComponentUseContext.EngineRuntime(None) implicit val metaData: MetaData = MetaData("testProc", StreamMetaData()) implicit val context: Context = Context("testContextId", Map.empty) val jobData = JobData(metaData, ProcessVersion.empty.copy(processName = metaData.name)) diff --git a/components/openapi/src/test/scala/pl/touk/nussknacker/openapi/BaseOpenAPITest.scala b/components/openapi/src/test/scala/pl/touk/nussknacker/openapi/BaseOpenAPITest.scala index 678f0db91fa..f7d41b7eb85 100644 --- a/components/openapi/src/test/scala/pl/touk/nussknacker/openapi/BaseOpenAPITest.scala +++ b/components/openapi/src/test/scala/pl/touk/nussknacker/openapi/BaseOpenAPITest.scala @@ -3,9 +3,8 @@ package pl.touk.nussknacker.openapi import cats.data.Validated import cats.data.Validated.{Invalid, Valid} import org.apache.commons.io.IOUtils -import pl.touk.nussknacker.engine.api.process.ComponentUseContext import pl.touk.nussknacker.engine.api._ -import pl.touk.nussknacker.engine.api.component.NodesDeploymentData +import pl.touk.nussknacker.engine.api.process.ComponentUseContext import pl.touk.nussknacker.engine.util.runtimecontext.TestEngineRuntimeContext import pl.touk.nussknacker.engine.util.service.EagerServiceWithStaticParametersAndReturnType import pl.touk.nussknacker.openapi.enrichers.{SwaggerEnricherCreator, SwaggerEnrichers} @@ -20,9 +19,9 @@ trait BaseOpenAPITest { protected val baseConfig: OpenAPIServicesConfig = OpenAPIServicesConfig(new URL("http://foo")) - implicit val componentUseCase: ComponentUseContext = ComponentUseContext.EngineRuntime(NodesDeploymentData.empty) - implicit val metaData: MetaData = MetaData("testProc", StreamMetaData()) - implicit val context: Context = Context("testContextId", Map.empty) + implicit val componentUseContext: ComponentUseContext = ComponentUseContext.EngineRuntime(Map.empty) + implicit val metaData: MetaData = MetaData("testProc", StreamMetaData()) + implicit val context: Context = Context("testContextId", Map.empty) private val jobData = JobData(metaData, ProcessVersion.empty.copy(processName = metaData.name)) private val runtimeContext = TestEngineRuntimeContext(jobData) diff --git a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/TableSource.scala b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/TableSource.scala index f5de5b9e3fc..b341900acf4 100644 --- a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/TableSource.scala +++ b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/TableSource.scala @@ -1,4 +1,4 @@ -package pl.touk.nussknacker.engine.flink.table.source; +package pl.touk.nussknacker.engine.flink.table.source import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment @@ -8,7 +8,6 @@ import org.apache.flink.table.catalog.Column.{ComputedColumn, MetadataColumn, Ph import org.apache.flink.types.Row import pl.touk.nussknacker.engine.api.definition.Parameter import pl.touk.nussknacker.engine.api.parameter.ParameterName -import pl.touk.nussknacker.engine.api.process.ComponentUseContext.EngineRuntime import pl.touk.nussknacker.engine.api.process.{ BasicContextInitializer, ContextInitializer, diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/FlinkProcessRegistrar.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/FlinkProcessRegistrar.scala index b3b404a0378..042d8e7c911 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/FlinkProcessRegistrar.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/FlinkProcessRegistrar.scala @@ -153,7 +153,7 @@ class FlinkProcessRegistrar( globalParameters = globalParameters, validationContext, compilerData.componentUseCase.toContext( - deploymentData.nodesData.get(NodeId(nodeComponentId.nodeId)).getOrElse(Map.empty) + deploymentData.nodesData.get(NodeId(nodeComponentId.nodeId)) ), // TODO: we should verify if component supports given node data type. If not, we should throw some error instead // of silently skip these data diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/functional/GenericTransformationSpec.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/functional/GenericTransformationSpec.scala index 24ab4a50b13..6398ed70c19 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/functional/GenericTransformationSpec.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/functional/GenericTransformationSpec.scala @@ -2,7 +2,6 @@ package pl.touk.nussknacker.engine.process.functional import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers -import pl.touk.nussknacker.engine.api.component.NodesDeploymentData import pl.touk.nussknacker.engine.api.process.ComponentUseContext import pl.touk.nussknacker.engine.api.typed.TypedMap import pl.touk.nussknacker.engine.build.ScenarioBuilder @@ -80,7 +79,7 @@ class GenericTransformationSpec extends AnyFunSuite with Matchers with ProcessTe processInvoker.invokeWithSampleData(process, Nil) ProcessTestHelpers.genericParameterSinkResultsHolder.results shouldBe List( - s"type2-3+type1-2+componentUseCase:${ComponentUseContext.EngineRuntime()}" + s"type2-3+type1-2+componentUseCase:${ComponentUseContext.EngineRuntime(None)}" ) } @@ -108,7 +107,7 @@ class GenericTransformationSpec extends AnyFunSuite with Matchers with ProcessTe processInvoker.invokeWithSampleData(process, Nil) ProcessTestHelpers.genericParameterSinkResultsHolder.results shouldBe List( - s"test|transformed:test|4+type1-2+componentUseCase:${ComponentUseContext.EngineRuntime(NodesDeploymentData.empty)}" + s"test|transformed:test|4+type1-2+componentUseCase:${ComponentUseContext.EngineRuntime(None)}" ) } diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/functional/ProcessSpec.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/functional/ProcessSpec.scala index 9cde1122baf..fd02536c9a2 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/functional/ProcessSpec.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/functional/ProcessSpec.scala @@ -3,10 +3,10 @@ package pl.touk.nussknacker.engine.process.functional import org.scalatest.LoneElement._ import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers +import pl.touk.nussknacker.engine.ComponentUseCase import pl.touk.nussknacker.engine.api.StreamMetaData import pl.touk.nussknacker.engine.api.component.{ComponentType, NodeComponentInfo} import pl.touk.nussknacker.engine.api.exception.NonTransientException -import pl.touk.nussknacker.engine.api.process.ComponentUseContext import pl.touk.nussknacker.engine.build.{GraphBuilder, ScenarioBuilder} import pl.touk.nussknacker.engine.flink.test.{RecordingExceptionConsumer, RecordingExceptionConsumerProvider} import pl.touk.nussknacker.engine.flink.util.sink.SingleValueSinkFactory @@ -292,7 +292,7 @@ class ProcessSpec extends AnyFunSuite with Matchers with ProcessTestHelpers { processInvoker.invokeWithSampleData(process, data) - ProcessTestHelpers.sinkForStringsResultsHolder.results.loneElement shouldBe ComponentUseContext.EngineRuntime.toString + ProcessTestHelpers.sinkForStringsResultsHolder.results.loneElement shouldBe ComponentUseCase.EngineRuntime.toString } test("should handle errors on branches after split independently") { diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/helpers/ProcessTestHelpers.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/helpers/ProcessTestHelpers.scala index d7b4393e4ee..79f0691b881 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/helpers/ProcessTestHelpers.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/helpers/ProcessTestHelpers.scala @@ -70,7 +70,7 @@ object ProcessTestHelpers extends Serializable { ComponentDefinition("eagerLifecycleService", EagerLifecycleService), ComponentDefinition("enricherWithOpenService", new EnricherWithOpenService), ComponentDefinition("serviceAcceptingOptionalValue", ServiceAcceptingScalaOption), - ComponentDefinition("returningComponentUseCaseService", ReturningComponentUseCaseService), + ComponentDefinition("returningComponentUseCaseService", ReturningComponentUseContextService), ComponentDefinition( "throwingNonTransientErrors", new ThrowingService(NonTransientException("test input", "test msg")) diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/SimpleProcessConfigCreator.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/SimpleProcessConfigCreator.scala index 008ba927175..0a8f5548b88 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/SimpleProcessConfigCreator.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/SimpleProcessConfigCreator.scala @@ -17,10 +17,10 @@ class SimpleProcessConfigCreator extends EmptyProcessConfigCreator { Map( "logService" -> WithCategories(LogService, "c1"), "throwingService" -> WithCategories(new ThrowingService(new RuntimeException("Thrown as expected")), "c1"), - "throwingTransientService" -> WithCategories(new ThrowingService(new ConnectException()), "c1"), - "returningDependentTypeService" -> WithCategories(ReturningDependentTypeService, "c1"), - "collectingEager" -> WithCategories(CollectingEagerService, "c1"), - "returningComponentUseCaseService" -> WithCategories(ReturningComponentUseCaseService, "c1") + "throwingTransientService" -> WithCategories(new ThrowingService(new ConnectException()), "c1"), + "returningDependentTypeService" -> WithCategories(ReturningDependentTypeService, "c1"), + "collectingEager" -> WithCategories(CollectingEagerService, "c1"), + "returningComponentUseContextService" -> WithCategories(ReturningComponentUseContextService, "c1") ) override def sinkFactories( diff --git a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/helpers/SampleNodes.scala b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/helpers/SampleNodes.scala index b5e5df181ad..df1ff057052 100644 --- a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/helpers/SampleNodes.scala +++ b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/helpers/SampleNodes.scala @@ -1091,11 +1091,11 @@ object SampleNodes { @JsonCodec case class KeyValue(key: String, value: Int, date: Long) - object ReturningComponentUseCaseService extends Service with Serializable { + object ReturningComponentUseContextService extends Service with Serializable { @MethodToInvoke - def invoke(implicit componentUseCase: ComponentUseContext): Future[ComponentUseContext] = { - Future.successful(componentUseCase) + def invoke(implicit componentUseContext: ComponentUseContext): Future[ComponentUseContext] = { + Future.successful(componentUseContext) } } diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/ComponentUseCase.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/ComponentUseCase.scala index 9b4bf6f6b21..1ec86eed25d 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/ComponentUseCase.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/ComponentUseCase.scala @@ -6,7 +6,7 @@ import pl.touk.nussknacker.engine.api.process.ComponentUseContext sealed trait ComponentUseCase { - def toContext(nodeDeploymentData: NodeDeploymentData): ComponentUseContext = this match { + def toContext(nodeDeploymentData: Option[NodeDeploymentData]): ComponentUseContext = this match { case EngineRuntime => ComponentUseContext.EngineRuntime(nodeDeploymentData) case TestRuntime => ComponentUseContext.TestRuntime case Validation => ComponentUseContext.Validation diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/Interpreter.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/Interpreter.scala index 39bdb3cf2e4..290e5e871cd 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/Interpreter.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/Interpreter.scala @@ -249,7 +249,7 @@ private class InterpreterInternal[F[_]: Monad]( } private def invoke(ref: ServiceRef, ctx: Context)(implicit node: Node) = { - val nodeDeploymentData = nodesDeploymentData.get(NodeId(node.id)).getOrElse(Map.empty) + val nodeDeploymentData = nodesDeploymentData.get(NodeId(node.id)) implicit val componentUseContext: ComponentUseContext = componentUseCase.toContext(nodeDeploymentData) val resultFuture = ref.invoke(ctx, serviceExecutionContext) import SynchronousExecutionContextAndIORuntime.syncEc diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/NodeCompiler.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/NodeCompiler.scala index 768a7c39e97..f3d8e6c5da4 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/NodeCompiler.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/NodeCompiler.scala @@ -577,7 +577,7 @@ class NodeCompiler( compiledParameters, outputVariableNameOpt, additionalDependencies, - componentUseCase.toContext(nodesDeploymentData.get(nodeId).getOrElse(Map.empty)), + componentUseCase.toContext(nodesDeploymentData.get(nodeId)), nonServicesLazyParamStrategy ) .map { componentExecutor => diff --git a/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/compile/nodecompilation/MethodBasedServiceInvokerTest.scala b/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/compile/nodecompilation/MethodBasedServiceInvokerTest.scala index 146189b222f..8b2d7196746 100644 --- a/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/compile/nodecompilation/MethodBasedServiceInvokerTest.scala +++ b/scenario-compiler/src/test/scala/pl/touk/nussknacker/engine/compile/nodecompilation/MethodBasedServiceInvokerTest.scala @@ -20,7 +20,7 @@ class ServiceInvokerTest extends AnyFlatSpec with PatientScalaFutures with Optio import scala.concurrent.ExecutionContext.Implicits.global private implicit val metadata: MetaData = MetaData("proc1", StreamMetaData()) - private implicit val componentUseContext: ComponentUseContext = ComponentUseContext.EngineRuntime(Map.empty) + private implicit val componentUseContext: ComponentUseContext = ComponentUseContext.EngineRuntime(None) private val context: Context = Context.withInitialId private val nodeId = NodeId("id")