Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Łukasz Bigorajski committed Feb 25, 2025
1 parent 0623ba8 commit 9f266e4
Show file tree
Hide file tree
Showing 15 changed files with 33 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ import io.circe.{Decoder, Encoder}
import pl.touk.nussknacker.engine.api.NodeId
import pl.touk.nussknacker.engine.api.component.NodesDeploymentData.NodeDeploymentData

// todo make private
final case class NodesDeploymentData(dataByNodeId: Map[NodeId, NodeDeploymentData]) {
final case class NodesDeploymentData(private val dataByNodeId: Map[NodeId, NodeDeploymentData]) {
def get(nodeId: NodeId): Option[NodeDeploymentData] = dataByNodeId.get(nodeId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,19 @@ import pl.touk.nussknacker.engine.api.process.ComponentUseContext.EngineRuntime
* </ul>
*/
sealed trait ComponentUseContext {

def deploymentData(): Option[NodeDeploymentData] = this match {
case EngineRuntime(nodeData) => Some(nodeData)
case _ => None
case EngineRuntime(nodeData) => nodeData
case _ => None
}

}

object ComponentUseContext {
case class EngineRuntime(nodeData: NodeDeploymentData) extends ComponentUseContext
case object TestRuntime extends ComponentUseContext
case object Validation extends ComponentUseContext
case object ServiceQuery extends ComponentUseContext
case object TestDataGeneration extends ComponentUseContext
case class EngineRuntime(nodeData: Option[NodeDeploymentData]) extends ComponentUseContext
case object TestRuntime extends ComponentUseContext
case object Validation extends ComponentUseContext
case object ServiceQuery extends ComponentUseContext
case object TestDataGeneration extends ComponentUseContext

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class OpenAPIServiceSpec
with LazyLogging
with PatientScalaFutures {

implicit val componentUseCase: ComponentUseContext = ComponentUseContext.EngineRuntime(Map.empty)
implicit val componentUseCase: ComponentUseContext = ComponentUseContext.EngineRuntime(None)
implicit val metaData: MetaData = MetaData("testProc", StreamMetaData())
implicit val context: Context = Context("testContextId", Map.empty)
val jobData = JobData(metaData, ProcessVersion.empty.copy(processName = metaData.name))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ package pl.touk.nussknacker.openapi
import cats.data.Validated
import cats.data.Validated.{Invalid, Valid}
import org.apache.commons.io.IOUtils
import pl.touk.nussknacker.engine.api.process.ComponentUseContext
import pl.touk.nussknacker.engine.api._
import pl.touk.nussknacker.engine.api.component.NodesDeploymentData
import pl.touk.nussknacker.engine.api.process.ComponentUseContext
import pl.touk.nussknacker.engine.util.runtimecontext.TestEngineRuntimeContext
import pl.touk.nussknacker.engine.util.service.EagerServiceWithStaticParametersAndReturnType
import pl.touk.nussknacker.openapi.enrichers.{SwaggerEnricherCreator, SwaggerEnrichers}
Expand All @@ -20,9 +19,9 @@ trait BaseOpenAPITest {

protected val baseConfig: OpenAPIServicesConfig = OpenAPIServicesConfig(new URL("http://foo"))

implicit val componentUseCase: ComponentUseContext = ComponentUseContext.EngineRuntime(NodesDeploymentData.empty)
implicit val metaData: MetaData = MetaData("testProc", StreamMetaData())
implicit val context: Context = Context("testContextId", Map.empty)
implicit val componentUseContext: ComponentUseContext = ComponentUseContext.EngineRuntime(Map.empty)
implicit val metaData: MetaData = MetaData("testProc", StreamMetaData())
implicit val context: Context = Context("testContextId", Map.empty)
private val jobData = JobData(metaData, ProcessVersion.empty.copy(processName = metaData.name))
private val runtimeContext = TestEngineRuntimeContext(jobData)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pl.touk.nussknacker.engine.flink.table.source;
package pl.touk.nussknacker.engine.flink.table.source

import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
Expand All @@ -8,7 +8,6 @@ import org.apache.flink.table.catalog.Column.{ComputedColumn, MetadataColumn, Ph
import org.apache.flink.types.Row
import pl.touk.nussknacker.engine.api.definition.Parameter
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.api.process.ComponentUseContext.EngineRuntime
import pl.touk.nussknacker.engine.api.process.{
BasicContextInitializer,
ContextInitializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class FlinkProcessRegistrar(
globalParameters = globalParameters,
validationContext,
compilerData.componentUseCase.toContext(
deploymentData.nodesData.get(NodeId(nodeComponentId.nodeId)).getOrElse(Map.empty)
deploymentData.nodesData.get(NodeId(nodeComponentId.nodeId))
),
// TODO: we should verify if component supports given node data type. If not, we should throw some error instead
// of silently skip these data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pl.touk.nussknacker.engine.process.functional

import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api.component.NodesDeploymentData
import pl.touk.nussknacker.engine.api.process.ComponentUseContext
import pl.touk.nussknacker.engine.api.typed.TypedMap
import pl.touk.nussknacker.engine.build.ScenarioBuilder
Expand Down Expand Up @@ -80,7 +79,7 @@ class GenericTransformationSpec extends AnyFunSuite with Matchers with ProcessTe
processInvoker.invokeWithSampleData(process, Nil)

ProcessTestHelpers.genericParameterSinkResultsHolder.results shouldBe List(
s"type2-3+type1-2+componentUseCase:${ComponentUseContext.EngineRuntime()}"
s"type2-3+type1-2+componentUseCase:${ComponentUseContext.EngineRuntime(None)}"
)
}

Expand Down Expand Up @@ -108,7 +107,7 @@ class GenericTransformationSpec extends AnyFunSuite with Matchers with ProcessTe
processInvoker.invokeWithSampleData(process, Nil)

ProcessTestHelpers.genericParameterSinkResultsHolder.results shouldBe List(
s"test|transformed:test|4+type1-2+componentUseCase:${ComponentUseContext.EngineRuntime(NodesDeploymentData.empty)}"
s"test|transformed:test|4+type1-2+componentUseCase:${ComponentUseContext.EngineRuntime(None)}"
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package pl.touk.nussknacker.engine.process.functional
import org.scalatest.LoneElement._
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.ComponentUseCase
import pl.touk.nussknacker.engine.api.StreamMetaData
import pl.touk.nussknacker.engine.api.component.{ComponentType, NodeComponentInfo}
import pl.touk.nussknacker.engine.api.exception.NonTransientException
import pl.touk.nussknacker.engine.api.process.ComponentUseContext
import pl.touk.nussknacker.engine.build.{GraphBuilder, ScenarioBuilder}
import pl.touk.nussknacker.engine.flink.test.{RecordingExceptionConsumer, RecordingExceptionConsumerProvider}
import pl.touk.nussknacker.engine.flink.util.sink.SingleValueSinkFactory
Expand Down Expand Up @@ -292,7 +292,7 @@ class ProcessSpec extends AnyFunSuite with Matchers with ProcessTestHelpers {

processInvoker.invokeWithSampleData(process, data)

ProcessTestHelpers.sinkForStringsResultsHolder.results.loneElement shouldBe ComponentUseContext.EngineRuntime.toString
ProcessTestHelpers.sinkForStringsResultsHolder.results.loneElement shouldBe ComponentUseCase.EngineRuntime.toString
}

test("should handle errors on branches after split independently") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object ProcessTestHelpers extends Serializable {
ComponentDefinition("eagerLifecycleService", EagerLifecycleService),
ComponentDefinition("enricherWithOpenService", new EnricherWithOpenService),
ComponentDefinition("serviceAcceptingOptionalValue", ServiceAcceptingScalaOption),
ComponentDefinition("returningComponentUseCaseService", ReturningComponentUseCaseService),
ComponentDefinition("returningComponentUseCaseService", ReturningComponentUseContextService),
ComponentDefinition(
"throwingNonTransientErrors",
new ThrowingService(NonTransientException("test input", "test msg"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ class SimpleProcessConfigCreator extends EmptyProcessConfigCreator {
Map(
"logService" -> WithCategories(LogService, "c1"),
"throwingService" -> WithCategories(new ThrowingService(new RuntimeException("Thrown as expected")), "c1"),
"throwingTransientService" -> WithCategories(new ThrowingService(new ConnectException()), "c1"),
"returningDependentTypeService" -> WithCategories(ReturningDependentTypeService, "c1"),
"collectingEager" -> WithCategories(CollectingEagerService, "c1"),
"returningComponentUseCaseService" -> WithCategories(ReturningComponentUseCaseService, "c1")
"throwingTransientService" -> WithCategories(new ThrowingService(new ConnectException()), "c1"),
"returningDependentTypeService" -> WithCategories(ReturningDependentTypeService, "c1"),
"collectingEager" -> WithCategories(CollectingEagerService, "c1"),
"returningComponentUseContextService" -> WithCategories(ReturningComponentUseContextService, "c1")
)

override def sinkFactories(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1091,11 +1091,11 @@ object SampleNodes {

@JsonCodec case class KeyValue(key: String, value: Int, date: Long)

object ReturningComponentUseCaseService extends Service with Serializable {
object ReturningComponentUseContextService extends Service with Serializable {

@MethodToInvoke
def invoke(implicit componentUseCase: ComponentUseContext): Future[ComponentUseContext] = {
Future.successful(componentUseCase)
def invoke(implicit componentUseContext: ComponentUseContext): Future[ComponentUseContext] = {
Future.successful(componentUseContext)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import pl.touk.nussknacker.engine.api.process.ComponentUseContext

sealed trait ComponentUseCase {

def toContext(nodeDeploymentData: NodeDeploymentData): ComponentUseContext = this match {
def toContext(nodeDeploymentData: Option[NodeDeploymentData]): ComponentUseContext = this match {
case EngineRuntime => ComponentUseContext.EngineRuntime(nodeDeploymentData)
case TestRuntime => ComponentUseContext.TestRuntime
case Validation => ComponentUseContext.Validation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ private class InterpreterInternal[F[_]: Monad](
}

private def invoke(ref: ServiceRef, ctx: Context)(implicit node: Node) = {
val nodeDeploymentData = nodesDeploymentData.get(NodeId(node.id)).getOrElse(Map.empty)
val nodeDeploymentData = nodesDeploymentData.get(NodeId(node.id))
implicit val componentUseContext: ComponentUseContext = componentUseCase.toContext(nodeDeploymentData)
val resultFuture = ref.invoke(ctx, serviceExecutionContext)
import SynchronousExecutionContextAndIORuntime.syncEc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ class NodeCompiler(
compiledParameters,
outputVariableNameOpt,
additionalDependencies,
componentUseCase.toContext(nodesDeploymentData.get(nodeId).getOrElse(Map.empty)),
componentUseCase.toContext(nodesDeploymentData.get(nodeId)),
nonServicesLazyParamStrategy
)
.map { componentExecutor =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class ServiceInvokerTest extends AnyFlatSpec with PatientScalaFutures with Optio
import scala.concurrent.ExecutionContext.Implicits.global

private implicit val metadata: MetaData = MetaData("proc1", StreamMetaData())
private implicit val componentUseContext: ComponentUseContext = ComponentUseContext.EngineRuntime(Map.empty)
private implicit val componentUseContext: ComponentUseContext = ComponentUseContext.EngineRuntime(None)
private val context: Context = Context.withInitialId

private val nodeId = NodeId("id")
Expand Down

0 comments on commit 9f266e4

Please sign in to comment.