Skip to content

Commit

Permalink
some refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Łukasz Bigorajski committed Feb 25, 2025
1 parent c9fa8c5 commit 8a84e14
Show file tree
Hide file tree
Showing 74 changed files with 277 additions and 247 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,26 @@ import cats.data.Validated.{Invalid, Valid}
import cats.data.ValidatedNel
import pl.touk.nussknacker.engine.Interpreter.InterpreterShape
import pl.touk.nussknacker.engine.api._
import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, DesignerWideComponentId, UnboundedStreamComponent}
import pl.touk.nussknacker.engine.api.component.{
ComponentDefinition,
DesignerWideComponentId,
NodesDeploymentData,
UnboundedStreamComponent
}
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError
import pl.touk.nussknacker.engine.api.exception.NuExceptionInfo
import pl.touk.nussknacker.engine.api.process._
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.compile.ProcessCompilerData
import pl.touk.nussknacker.engine.compiledgraph.part.ProcessPart
import pl.touk.nussknacker.engine.definition.component.Components
import pl.touk.nussknacker.engine.definition.component.Components.ComponentDefinitionExtractionMode
import pl.touk.nussknacker.engine.definition.component.{ComponentDefinitionWithImplementation, Components}
import pl.touk.nussknacker.engine.definition.model.{ModelDefinition, ModelDefinitionWithClasses}
import pl.touk.nussknacker.engine.dict.SimpleDictRegistry
import pl.touk.nussknacker.engine.modelconfig.ComponentsUiConfig
import pl.touk.nussknacker.engine.resultcollector.ProductionServiceInvocationCollector
import pl.touk.nussknacker.engine.testing.ModelDefinitionBuilder
import pl.touk.nussknacker.engine.util.Implicits._
import pl.touk.nussknacker.engine.{CustomProcessValidatorLoader, InterpretationResult, api}
import pl.touk.nussknacker.engine.{ComponentUseCase, CustomProcessValidatorLoader, InterpretationResult, api}

import scala.language.higherKinds
import scala.reflect.ClassTag
Expand Down Expand Up @@ -74,7 +78,8 @@ class InterpreterSetup[T: ClassTag] {
getClass.getClassLoader,
ProductionServiceInvocationCollector,
ComponentUseCase.EngineRuntime,
CustomProcessValidatorLoader.emptyCustomProcessValidator
CustomProcessValidatorLoader.emptyCustomProcessValidator,
NodesDeploymentData.empty,
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package pl.touk.nussknacker.engine.api

import pl.touk.nussknacker.engine.api.component.NodesDeploymentData.NodeDeploymentData
import pl.touk.nussknacker.engine.api.component.{AllProcessingModesComponent, Component}
import pl.touk.nussknacker.engine.api.process.ComponentUseCase
import pl.touk.nussknacker.engine.api.process.ComponentUseContext
import pl.touk.nussknacker.engine.api.test.InvocationCollectors.ServiceInvocationCollector

import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -38,8 +37,7 @@ trait ServiceInvoker {
def invoke(context: Context)(
implicit ec: ExecutionContext,
collector: ServiceInvocationCollector,
componentUseCase: ComponentUseCase,
nodeDeploymentData: NodeDeploymentData,
componentUseContext: ComponentUseContext,
): Future[Any]

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package pl.touk.nussknacker.engine.api.process

import pl.touk.nussknacker.engine.api.component.NodesDeploymentData.NodeDeploymentData
import pl.touk.nussknacker.engine.api.process.ComponentUseContext.EngineRuntime

/**
* Specifies the mode a node is used/invoked. It can be one of the following values:
* <ul>
Expand All @@ -10,13 +13,18 @@ package pl.touk.nussknacker.engine.api.process
* <li>TestDataGeneration - used when compiling, but only for purpose of generating test data. Components should not be invoked in this mode.</li>
* </ul>
*/
sealed trait ComponentUseCase
sealed trait ComponentUseContext {
def deploymentData(): Option[NodeDeploymentData] = this match {
case EngineRuntime(nodeData) => Some(nodeData)
case _ => None
}
}

object ComponentUseCase {
case object EngineRuntime extends ComponentUseCase
case object TestRuntime extends ComponentUseCase
case object Validation extends ComponentUseCase
case object ServiceQuery extends ComponentUseCase
case object TestDataGeneration extends ComponentUseCase
object ComponentUseContext {
case class EngineRuntime(nodeData: NodeDeploymentData) extends ComponentUseContext
case object TestRuntime extends ComponentUseContext
case object Validation extends ComponentUseContext
case object ServiceQuery extends ComponentUseContext
case object TestDataGeneration extends ComponentUseContext

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfterAll, Outcome}
import pl.touk.nussknacker.engine.api._
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.api.process.ComponentUseCase
import pl.touk.nussknacker.engine.api.process.ComponentUseContext
import pl.touk.nussknacker.engine.api.test.EmptyInvocationCollector.Instance
import pl.touk.nussknacker.engine.api.typed.TypedMap
import pl.touk.nussknacker.engine.util.ResourceLoader
Expand All @@ -17,7 +17,7 @@ import pl.touk.nussknacker.engine.util.service.EagerServiceWithStaticParametersA
import pl.touk.nussknacker.http.backend.FixedAsyncHttpClientBackendProvider
import pl.touk.nussknacker.openapi.enrichers.{SwaggerEnricherCreator, SwaggerEnrichers}
import pl.touk.nussknacker.openapi.parser.SwaggerParser
import pl.touk.nussknacker.openapi.{ApiKeySecret, OpenAPIServicesConfig, SecurityConfig, SecuritySchemeName}
import pl.touk.nussknacker.openapi.{ApiKeySecret, OpenAPIServicesConfig, SecuritySchemeName}
import pl.touk.nussknacker.test.PatientScalaFutures

import java.net.URL
Expand All @@ -31,9 +31,9 @@ class OpenAPIServiceSpec
with LazyLogging
with PatientScalaFutures {

implicit val componentUseCase: ComponentUseCase = ComponentUseCase.EngineRuntime
implicit val metaData: MetaData = MetaData("testProc", StreamMetaData())
implicit val context: Context = Context("testContextId", Map.empty)
implicit val componentUseCase: ComponentUseContext = ComponentUseContext.EngineRuntime(Map.empty)
implicit val metaData: MetaData = MetaData("testProc", StreamMetaData())
implicit val context: Context = Context("testContextId", Map.empty)
val jobData = JobData(metaData, ProcessVersion.empty.copy(processName = metaData.name))

type FixtureParam = EagerServiceWithStaticParametersAndReturnType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package pl.touk.nussknacker.openapi.enrichers
import org.asynchttpclient.DefaultAsyncHttpClient
import pl.touk.nussknacker.engine.api.definition.Parameter
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.api.process.ComponentUseCase
import pl.touk.nussknacker.engine.api.process.ComponentUseContext
import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext
import pl.touk.nussknacker.engine.api.test.InvocationCollectors.ServiceInvocationCollector
import pl.touk.nussknacker.engine.api.typed.typing
Expand Down Expand Up @@ -63,7 +63,7 @@ class SwaggerEnricher(
collector: ServiceInvocationCollector,
contextId: ContextId,
metaData: MetaData,
componentUseCase: ComponentUseCase
componentUseCase: ComponentUseContext
): Future[AnyRef] =
measuring {
swaggerHttpService.invoke(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package pl.touk.nussknacker.openapi
import cats.data.Validated
import cats.data.Validated.{Invalid, Valid}
import org.apache.commons.io.IOUtils
import pl.touk.nussknacker.engine.api.process.ComponentUseCase
import pl.touk.nussknacker.engine.api.process.ComponentUseContext
import pl.touk.nussknacker.engine.api._
import pl.touk.nussknacker.engine.api.component.NodesDeploymentData
import pl.touk.nussknacker.engine.util.runtimecontext.TestEngineRuntimeContext
import pl.touk.nussknacker.engine.util.service.EagerServiceWithStaticParametersAndReturnType
import pl.touk.nussknacker.openapi.enrichers.{SwaggerEnricherCreator, SwaggerEnrichers}
Expand All @@ -19,9 +20,9 @@ trait BaseOpenAPITest {

protected val baseConfig: OpenAPIServicesConfig = OpenAPIServicesConfig(new URL("http://foo"))

implicit val componentUseCase: ComponentUseCase = ComponentUseCase.EngineRuntime
implicit val metaData: MetaData = MetaData("testProc", StreamMetaData())
implicit val context: Context = Context("testContextId", Map.empty)
implicit val componentUseCase: ComponentUseContext = ComponentUseContext.EngineRuntime(NodesDeploymentData.empty)
implicit val metaData: MetaData = MetaData("testProc", StreamMetaData())
implicit val context: Context = Context("testContextId", Map.empty)
private val jobData = JobData(metaData, ProcessVersion.empty.copy(processName = metaData.name))
private val runtimeContext = TestEngineRuntimeContext(jobData)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package pl.touk.nussknacker.sql.service

import pl.touk.nussknacker.engine.api.component.NodesDeploymentData.NodeDeploymentData
import pl.touk.nussknacker.engine.api.process.ComponentUseCase
import pl.touk.nussknacker.engine.api.process.ComponentUseContext
import pl.touk.nussknacker.engine.api.test.InvocationCollectors.ServiceInvocationCollector
import pl.touk.nussknacker.engine.api.typed.typing
import pl.touk.nussknacker.engine.api.{Context, Params, ServiceInvoker}
import pl.touk.nussknacker.engine.util.service.AsyncExecutionTimeMeasurement
import pl.touk.nussknacker.sql.db.WithDBConnectionPool
Expand Down Expand Up @@ -35,8 +33,7 @@ class DatabaseEnricherInvoker(
override def invoke(context: Context)(
implicit ec: ExecutionContext,
collector: ServiceInvocationCollector,
componentUseCase: ComponentUseCase,
nodeDeploymentData: NodeDeploymentData,
componentUseContext: ComponentUseContext,
): Future[queryExecutor.QueryResult] = {
getTimeMeasurement().measuring {
queryDatabase(queryArgumentsExtractor(argsCount, params, context))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package pl.touk.nussknacker.sql.service

import com.github.benmanes.caffeine.cache.{AsyncCache, Caffeine}
import pl.touk.nussknacker.engine.api.component.NodesDeploymentData.NodeDeploymentData
import pl.touk.nussknacker.engine.api.{Context, Params}
import pl.touk.nussknacker.engine.api.process.ComponentUseCase
import pl.touk.nussknacker.engine.api.process.ComponentUseContext
import pl.touk.nussknacker.engine.api.test.InvocationCollectors.ServiceInvocationCollector
import pl.touk.nussknacker.engine.api.typed.typing
import pl.touk.nussknacker.engine.api.{Context, Params}
import pl.touk.nussknacker.engine.util.service.AsyncExecutionTimeMeasurement
import pl.touk.nussknacker.sql.db.query.{QueryArguments, QueryResultStrategy}
import pl.touk.nussknacker.sql.db.schema.TableDefinition
Expand Down Expand Up @@ -54,8 +52,7 @@ class DatabaseEnricherInvokerWithCache(
override def invoke(context: Context)(
implicit ec: ExecutionContext,
collector: ServiceInvocationCollector,
componentUseCase: ComponentUseCase,
nodeDeploymentData: NodeDeploymentData,
componentUseContext: ComponentUseContext,
): Future[queryExecutor.QueryResult] = {
getTimeMeasurement().measuring {
val queryArguments = queryArgumentsExtractor(argsCount, params, context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api._
import pl.touk.nussknacker.engine.api.component.NodesDeploymentData.NodeDeploymentData
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.EmptyProcess
import pl.touk.nussknacker.engine.api.context.ValidationContext
import pl.touk.nussknacker.engine.api.context.transformation.{FailedToDefineParameter, OutputVariableNameValue}
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.api.process.ComponentUseCase
import pl.touk.nussknacker.engine.api.process.ComponentUseContext
import pl.touk.nussknacker.engine.api.test.EmptyInvocationCollector
import pl.touk.nussknacker.engine.api.test.InvocationCollectors.ServiceInvocationCollector
import pl.touk.nussknacker.engine.api.typed.typing
Expand All @@ -20,12 +19,11 @@ import scala.concurrent.ExecutionContext

trait BaseDatabaseQueryEnricherTest extends AnyFunSuite with Matchers with BeforeAndAfterAll {

implicit val ec: ExecutionContext = ExecutionContext.Implicits.global
implicit val context: Context = Context("", Map.empty)
implicit val metaData: MetaData = MetaData("", StreamMetaData())
implicit val collector: ServiceInvocationCollector = EmptyInvocationCollector.Instance
implicit val componentUseCase: ComponentUseCase = ComponentUseCase.TestRuntime
implicit val nodeDeploymentData: NodeDeploymentData = Map.empty
implicit val ec: ExecutionContext = ExecutionContext.Implicits.global
implicit val context: Context = Context("", Map.empty)
implicit val metaData: MetaData = MetaData("", StreamMetaData())
implicit val collector: ServiceInvocationCollector = EmptyInvocationCollector.Instance
implicit val componentUseContext: ComponentUseContext = ComponentUseContext.TestRuntime

val jobData: JobData = JobData(metaData, ProcessVersion.empty.copy(processName = metaData.name))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import pl.touk.nussknacker.engine.api.parameter.{
ValueInputWithFixedValuesProvided
}
import pl.touk.nussknacker.engine.api.process.{
ComponentUseCase,
ComponentUseContext,
EmptyProcessConfigCreator,
ExpressionConfig,
ProcessName,
Expand Down Expand Up @@ -2579,7 +2579,7 @@ private object UIProcessValidatorSpec {
collector: ServiceInvocationCollector,
contextId: ContextId,
metaData: MetaData,
componentUseCase: ComponentUseCase
componentUseContext: ComponentUseContext
): Future[Any] = {
Future.successful(eagerParameters.head._2.toString)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import pl.touk.nussknacker.engine.api.context.transformation._
import pl.touk.nussknacker.engine.api.definition._
import pl.touk.nussknacker.engine.api.generics.ExpressionParseError.TabularDataDefinitionParserErrorDetails
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.api.process.ComponentUseCase
import pl.touk.nussknacker.engine.api.process.ComponentUseContext
import pl.touk.nussknacker.engine.api.test.InvocationCollectors
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult, Unknown}
import pl.touk.nussknacker.engine.graph.expression.TabularTypedData
Expand Down Expand Up @@ -142,8 +142,7 @@ object DecisionTable extends EagerService with SingleInputDynamicComponent[Servi
override def invoke(context: Context)(
implicit ec: ExecutionContext,
collector: InvocationCollectors.ServiceInvocationCollector,
componentUseCase: ComponentUseCase,
nodeDeploymentData: NodeDeploymentData,
componentUseContext: ComponentUseContext,
): Future[Output] = Future {
filterRows(tabularData, context)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ package pl.touk.nussknacker.engine.flink.api.process

import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.typeinfo.TypeInformation
import pl.touk.nussknacker.engine.api.component.NodesDeploymentData.NodeDeploymentData
import pl.touk.nussknacker.engine.api.context.ValidationContext
import pl.touk.nussknacker.engine.api.process.ComponentUseCase
import pl.touk.nussknacker.engine.api.process.ComponentUseContext
import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult, Unknown}
import pl.touk.nussknacker.engine.api.{Context, JobData, MetaData, ValueWithContext}
Expand All @@ -25,8 +24,7 @@ case class FlinkCustomNodeContext(
exceptionHandlerPreparer: RuntimeContext => ExceptionHandler,
globalParameters: Option[NkGlobalParameters],
validationContext: Either[ValidationContext, Map[String, ValidationContext]],
componentUseCase: ComponentUseCase,
nodeDeploymentData: Option[NodeDeploymentData]
componentUseContext: ComponentUseContext,
) {
def metaData: MetaData = jobData.metaData

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import org.apache.flink.table.catalog.Column.{ComputedColumn, MetadataColumn, Ph
import org.apache.flink.types.Row
import pl.touk.nussknacker.engine.api.definition.Parameter
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.api.process.ComponentUseContext.EngineRuntime
import pl.touk.nussknacker.engine.api.process.{
BasicContextInitializer,
ContextInitializer,
Expand Down Expand Up @@ -50,7 +51,8 @@ class TableSource(

val selectQuery = tableEnv.from(tableDefinition.tableId.toString)

val finalQuery = flinkNodeContext.nodeDeploymentData
val finalQuery = flinkNodeContext.componentUseContext
.deploymentData()
.flatMap(_.get(SQL_EXPRESSION_PARAMETER_NAME))
.collect { case sqlExpression =>
tableEnv.executeSql(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package pl.touk.nussknacker.engine.process.compiler

import org.apache.flink.api.common.functions.RuntimeContext
import pl.touk.nussknacker.engine.ComponentUseCase
import pl.touk.nussknacker.engine.api.JobData
import pl.touk.nussknacker.engine.api.process.ComponentUseCase
import pl.touk.nussknacker.engine.api.runtimecontext.{ContextIdGenerator, IncContextIdGenerator}
import pl.touk.nussknacker.engine.flink.api.FlinkEngineRuntimeContext
import pl.touk.nussknacker.engine.process.compiler.MetricsProviderForFlink.createMetricsProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ import cats.data.Validated.{Invalid, Valid}
import cats.data._
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import pl.touk.nussknacker.engine.Interpreter
import pl.touk.nussknacker.engine.api.JobData
import pl.touk.nussknacker.engine.api.context.{ProcessCompilationError, ValidationContext}
import pl.touk.nussknacker.engine.api.process.{AsyncExecutionContextPreparer, ComponentUseCase}
import pl.touk.nussknacker.engine.api.process.AsyncExecutionContextPreparer
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.compile.ProcessCompilerData
import pl.touk.nussknacker.engine.compile.nodecompilation.EvaluableLazyParameterCreatorDeps
Expand All @@ -16,6 +15,7 @@ import pl.touk.nussknacker.engine.compiledgraph.node.Node
import pl.touk.nussknacker.engine.graph.node.NodeData
import pl.touk.nussknacker.engine.process.exception.FlinkExceptionHandler
import pl.touk.nussknacker.engine.splittedgraph.splittednode.SplittedNode
import pl.touk.nussknacker.engine.{ComponentUseCase, Interpreter}

import scala.concurrent.duration.FiniteDuration

Expand All @@ -32,7 +32,7 @@ class FlinkProcessCompilerData(
exceptionHandler: FlinkExceptionHandler,
val asyncExecutionContextPreparer: AsyncExecutionContextPreparer,
val processTimeout: FiniteDuration,
val componentUseCase: ComponentUseCase
val componentUseCase: ComponentUseCase,
) {

def open(runtimeContext: RuntimeContext, nodesToUse: List[_ <: NodeData]): Unit = {
Expand Down
Loading

0 comments on commit 8a84e14

Please sign in to comment.