From 9b8479ba52609a5154ad1e067625866fad562ea2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Pr=C3=B3chniak?= Date: Wed, 21 Jul 2021 13:24:06 +0200 Subject: [PATCH] Rename ProcessManager to DeploymentManager (#1921) * ProcessManager -> DeploymentManager * review * review --- .github/workflows/pr.yml | 2 +- build.sbt | 18 ++-- docs/Architecture.md | 15 ++-- docs/Changelog.md | 2 +- docs/Engines.md | 4 +- docs/MigrationGuide.md | 7 +- docs/operations_guide/Operations.md | 2 +- ...sManager.scala => DeploymentManager.scala} | 2 +- .../genericmodel/RunGenericModelLocally.scala | 6 +- engine/flink/management/periodic/README.md | 4 +- ....scala => PeriodicDeploymentManager.scala} | 12 +-- ...> PeriodicDeploymentManagerProvider.scala} | 18 ++-- .../periodic/PeriodicProcessService.scala | 8 +- .../periodic/flink/FlinkJarManager.scala | 4 +- ...Stub.scala => DeploymentManagerStub.scala} | 2 +- ...la => PeriodicDeploymentManagerTest.scala} | 84 +++++++++---------- ...eriodicProcessServiceIntegrationTest.scala | 10 +-- .../periodic/PeriodicProcessServiceTest.scala | 12 +-- ...FlinkStreamingDeploymentManagerSpec.scala} | 32 +++---- .../FlinkStreamingProcessTestRunnerSpec.scala | 10 +-- ... => JavaConfigDeploymentManagerSpec.scala} | 8 +- .../streaming/StreamingDockerTest.scala | 6 +- ...ssknacker.engine.DeploymentManagerProvider | 1 + ....nussknacker.engine.ProcessManagerProvider | 1 - ...ger.scala => FlinkDeploymentManager.scala} | 8 +- .../engine/management/FlinkRestManager.scala | 2 +- ...kStreamingDeploymentManagerProvider.scala} | 14 ++-- .../management/FlinkRestManagerSpec.scala | 2 +- ...ssknacker.engine.DeploymentManagerProvider | 1 + ....nussknacker.engine.ProcessManagerProvider | 1 - ....scala => DeploymentManagerProvider.scala} | 24 +++--- ...Stub.scala => DeploymentManagerStub.scala} | 10 +-- ... => StandaloneDeploymentManagerSpec.scala} | 6 +- ...ssknacker.engine.DeploymentManagerProvider | 1 + ....nussknacker.engine.ProcessManagerProvider | 1 - .../deployment/DeploymentService.scala | 4 +- ...cala => StandaloneDeploymentManager.scala} | 16 ++-- ui/README.md | 8 +- ui/buildServer.sh | 4 +- ui/runServer.sh | 2 +- .../touk/nussknacker/ui/NussknackerApp.scala | 2 +- .../ui/api/DefinitionResources.scala | 2 +- .../ui/api/ProcessesResources.scala | 8 +- .../definition/UIProcessObjectsFactory.scala | 6 +- .../process/deployment/ManagementActor.scala | 42 +++++----- .../ProcessingTypeDataReader.scala | 4 +- .../LocalNussknackerWithSingleModel.scala | 6 +- .../ui/api/ManagementResourcesSpec.scala | 12 +-- .../ui/api/ProcessesChangeListenerSpec.scala | 2 +- .../ui/api/ProcessesResourcesSpec.scala | 8 +- .../ui/api/helpers/EspItTest.scala | 16 ++-- .../ui/api/helpers/TestFactory.scala | 8 +- .../UIProcessObjectsFactorySpec.scala | 10 +-- .../deployment/ManagementActorSpec.scala | 48 +++++------ 54 files changed, 275 insertions(+), 273 deletions(-) rename engine/api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/{ProcessManager.scala => DeploymentManager.scala} (96%) rename engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/{PeriodicProcessManager.scala => PeriodicDeploymentManager.scala} (96%) rename engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/{PeriodicProcessManagerProvider.scala => PeriodicDeploymentManagerProvider.scala} (76%) rename engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/{ProcessManagerStub.scala => DeploymentManagerStub.scala} (97%) rename engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/{PeriodicProcessManagerTest.scala => PeriodicDeploymentManagerTest.scala} (70%) rename engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/{FlinkStreamingProcessManagerSpec.scala => FlinkStreamingDeploymentManagerSpec.scala} (85%) rename engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/{JavaConfigProcessManagerSpec.scala => JavaConfigDeploymentManagerSpec.scala} (77%) create mode 100644 engine/flink/management/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.DeploymentManagerProvider delete mode 100644 engine/flink/management/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.ProcessManagerProvider rename engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/{FlinkProcessManager.scala => FlinkDeploymentManager.scala} (95%) rename engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/{FlinkStreamingProcessManagerProvider.scala => FlinkStreamingDeploymentManagerProvider.scala} (71%) create mode 100644 engine/interpreter/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.DeploymentManagerProvider delete mode 100644 engine/interpreter/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.ProcessManagerProvider rename engine/interpreter/src/main/scala/pl/touk/nussknacker/engine/{ProcessManagerProvider.scala => DeploymentManagerProvider.scala} (66%) rename engine/interpreter/src/main/scala/pl/touk/nussknacker/engine/testing/{ProcessManagerStub.scala => DeploymentManagerStub.scala} (84%) rename engine/standalone/engine/src/it/scala/pl/touk/nussknacker/engine/standalone/management/{StandaloneProcessManagerSpec.scala => StandaloneDeploymentManagerSpec.scala} (88%) create mode 100644 engine/standalone/engine/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.DeploymentManagerProvider delete mode 100644 engine/standalone/engine/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.ProcessManagerProvider rename engine/standalone/engine/src/main/scala/pl/touk/nussknacker/engine/standalone/management/{StandaloneProcessManager.scala => StandaloneDeploymentManager.scala} (94%) diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 275b380fada..13f78f64fce 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -148,7 +148,7 @@ jobs: run: tar xfz target.tgz - name: Integration tests shell: bash - run: ./ciRunSbt.sh flinkProcessManager/it:test engineStandalone/it:test processReports/it:test security/it:test + run: ./ciRunSbt.sh flinkDeploymentManager/it:test engineStandalone/it:test processReports/it:test security/it:test slowTests: name: Slow tests runs-on: ubuntu-latest diff --git a/build.sbt b/build.sbt index aeb3636d5f2..f559f07ca95 100644 --- a/build.sbt +++ b/build.sbt @@ -339,14 +339,14 @@ lazy val dist = { packageName in Universal := ("nussknacker" + "-" + version.value), Keys.compile in Compile := (Keys.compile in Compile).dependsOn( (assembly in Compile) in generic, - (assembly in Compile) in flinkProcessManager, + (assembly in Compile) in flinkDeploymentManager, (assembly in Compile) in engineStandalone, (assembly in Compile) in openapi, (assembly in Compile) in sql, ).value, mappings in Universal ++= Seq( (crossTarget in generic).value / "genericModel.jar" -> "model/genericModel.jar", - (crossTarget in flinkProcessManager).value / "nussknacker-flink-manager.jar" -> "managers/nussknacker-flink-manager.jar", + (crossTarget in flinkDeploymentManager).value / "nussknacker-flink-manager.jar" -> "managers/nussknacker-flink-manager.jar", (crossTarget in engineStandalone).value / "nussknacker-standalone-manager.jar" -> "managers/nussknacker-standalone-manager.jar", (crossTarget in openapi).value / "openapi.jar" -> "components/openapi.jar", (crossTarget in sql).value / "sql.jar" -> "components/sql.jar" @@ -450,7 +450,7 @@ lazy val standaloneApp = (project in engine("standalone/app")). dependsOn(engineStandalone, interpreter, httpUtils, testUtil % "test", standaloneUtil % "test") -lazy val flinkProcessManager = (project in engine("flink/management")). +lazy val flinkDeploymentManager = (project in engine("flink/management")). configs(IntegrationTest). settings(commonSettings). settings(Defaults.itSettings). @@ -483,7 +483,7 @@ lazy val flinkProcessManager = (project in engine("flink/management")). httpUtils % "provided", kafkaTestUtil % "it,test") -lazy val flinkPeriodicProcessManager = (project in engine("flink/management/periodic")). +lazy val flinkPeriodicDeploymentManager = (project in engine("flink/management/periodic")). settings(commonSettings). settings(assemblySettings("nussknacker-flink-periodic-manager.jar", includeScala = false): _*). settings( @@ -497,7 +497,7 @@ lazy val flinkPeriodicProcessManager = (project in engine("flink/management/peri "com.cronutils" % "cron-utils" % cronParserV ) } - ).dependsOn(flinkProcessManager, + ).dependsOn(flinkDeploymentManager, interpreter % "provided", api % "provided", httpUtils % "provided", @@ -1034,7 +1034,7 @@ lazy val ui = (project in file("ui/server")) "org.slf4j" % "log4j-over-slf4j" % slf4jV, "com.carrotsearch" % "java-sizeof" % "0.0.5", - //It's needed by flinkProcessManager which has disabled includingScala + //It's needed by flinkDeploymentManager which has disabled includingScala "org.scala-lang" % "scala-compiler" % scalaVersion.value, "org.scala-lang" % "scala-reflect" % scalaVersion.value, @@ -1057,7 +1057,7 @@ lazy val ui = (project in file("ui/server")) //TODO: this is unfortunatelly needed to run without too much hassle in Intellij... //provided dependency of kafka is workaround for Idea, which is not able to handle test scope on module dependency //otherwise it is (wrongly) added to classpath when running UI from Idea - flinkProcessManager % "provided" , + flinkDeploymentManager % "provided" , kafka % "provided", engineStandalone % "provided" ) @@ -1100,7 +1100,7 @@ lazy val bom = (project in file("bom")) ).dependsOn(modules.map(k => k:ClasspathDep[ProjectReference]):_*) lazy val modules = List[ProjectReference]( - engineStandalone, standaloneApp, flinkProcessManager, flinkPeriodicProcessManager, standaloneSample, flinkManagementSample, managementJavaSample, generic, + engineStandalone, standaloneApp, flinkDeploymentManager, flinkPeriodicDeploymentManager, standaloneSample, flinkManagementSample, managementJavaSample, generic, openapi, process, interpreter, benchmarks, kafka, avroFlinkUtil, kafkaFlinkUtil, kafkaTestUtil, util, testUtil, flinkUtil, flinkModelUtil, flinkTestUtil, standaloneUtil, standaloneApi, api, security, flinkApi, processReports, httpUtils, queryableState, restmodel, listenerApi, ui, sql @@ -1139,4 +1139,4 @@ lazy val root = (project in file(".")) ) addCommandAlias("assemblySamples", ";flinkManagementSample/assembly;standaloneSample/assembly;generic/assembly") -addCommandAlias("assemblyEngines", ";flinkProcessManager/assembly;engineStandalone/assembly") +addCommandAlias("assemblyDeploymentManagers", ";flinkDeploymentManager/assembly;engineStandalone/assembly") diff --git a/docs/Architecture.md b/docs/Architecture.md index 2f829c092b7..344896c5623 100644 --- a/docs/Architecture.md +++ b/docs/Architecture.md @@ -2,18 +2,19 @@ Nussknacker consists of three parts: -**engines** - are **libraries** which transform internal json process representation (process graph) into jobs. For example Flink engine generates Flink specific code, compiles and packages all needed components into JAR file for execution, and then runs the job via Flink REST API. +**engines** - are **libraries** which transform internal json scenario representation (scenario graph) into jobs. For example Flink engine generates Flink specific code, compiles and packages all needed components into JAR file for execution, and then runs the job via Flink REST API. -**ui** - is a standalone **application** which allows users to design process diagrams and to deploy them into runtime environments. +**ui** - is a standalone **application** which allows users to design scenario diagrams and to deploy them into runtime environments. **integrations** - are your application specific classes like your model, http services, or custom stateful components. -##Engine -Engine consists of various modules that enable creation of processes building blocks in UI and interpretation of process diagrams. Engines implement `ProcessManagerProvider`. +## Engines +Engine consists of various modules that enable creation of scenarios building blocks in UI and interpretation of scenario diagrams. +Scenarios can be deployed on engine (like Flink) using `DeploymentManager` - e.g., for Flink it controls job submission to particular Flink cluster. -##UI -The **ui** application is a simple application written using Scala, Akka Http and Slick on the backend side and ReactJS on the front. Processes, their history, comments and other metadata are persisted in relational database (by default it's simple embedded H2). UI communicates with Apache Flink cluster using embedded Flink client. +## UI +The **ui** application is a simple application written using Scala, Akka Http and Slick on the backend side and ReactJS on the front. Scenarios, their history, comments and other metadata are persisted in relational database (by default it's simple embedded H2). UI communicates with Apache Flink cluster using embedded Flink client. -##Integrations +## Integrations Integrations module implements `ProcessConfigCreator` interface which is an entry point of Nussknacker. See [API](API.md) for more datails. \ No newline at end of file diff --git a/docs/Changelog.md b/docs/Changelog.md index a37f9ab4899..55233fa2e8c 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -61,8 +61,8 @@ Nussknacker versions * Various naming changes: * [#1917](https://github.com/TouK/nussknacker/pull/1917) configuration of `engineConfig` to `deploymentConfig` * [#1911](https://github.com/TouK/nussknacker/pull/1911) Rename `process` to `scenario`, `subprocess` to `fragment` in messages at backend and some test cases names + * [#1921](https://github.com/TouK/nussknacker/pull/1921) `ProcessManager` to `DeploymentManager` * [#1927](https://github.com/TouK/nussknacker/pull/1927) Rename `outer-join` to `single-side-join` - 0.3.1 (not released yet) ------------------------ diff --git a/docs/Engines.md b/docs/Engines.md index 25bc78caa52..7e6ffdcf3d3 100644 --- a/docs/Engines.md +++ b/docs/Engines.md @@ -1,11 +1,11 @@ Engines ======= Nussknacker was created as GUI for Flink. However, it's also possible to use it to connect to other runtimes. -```ProcessManager``` and ```ProcessManagerProvider``` interfaces were created to facilitate this. +```DeploymentManager``` and ```DeploymentManagerProvider``` interfaces were created to facilitate this. In particular, we provide experimental standalone engine, which allows using Nussknacker as method of creating REST APIs To create/customize Nussknacker engine you have to: -- Implement ```ProcessManagerProvider``` interface and register it with ServiceLoader mechanism. +- Implement ```DeploymentManagerProvider``` interface and register it with ServiceLoader mechanism. Each provider should have unique ```name```. - Put implementation with all needed libraries on Nussknacker classpath. - Configure appropriate model to use your process manager. diff --git a/docs/MigrationGuide.md b/docs/MigrationGuide.md index 8df5379d9eb..eef7ff0b5c1 100644 --- a/docs/MigrationGuide.md +++ b/docs/MigrationGuide.md @@ -152,6 +152,10 @@ To see biggest differences please consult the [changelog](Changelog.md). doesn't emit full context of variables that were before node (because of performance reasons and because that wasn't obvious which one context is emitted). If you want to emit some information other than aggregated value and key (availabled via new `#key` variable), you should use `#AGG.map` expression in `aggregateBy`. * [#1910](https://github.com/TouK/nussknacker/pull/1910) `processTypes` renamed to `scenarioTypes`. You can still use old `processTypes` configuration. Old configuration will be removed in version `0.5.0`. +* Various naming changes: + * [#1917](https://github.com/TouK/nussknacker/pull/1917) configuration of `engineConfig` to `deploymentConfig` + * [#1921](https://github.com/TouK/nussknacker/pull/1921) `ProcessManager` to `DeploymentManager` + * [#1927](https://github.com/TouK/nussknacker/pull/1927) Rename `outer-join` to `single-side-join` ## In version 0.3.0 @@ -182,9 +186,6 @@ that will be hidden before parameter's evaluation - Removed: `getClusterClient` from `FlinkMiniClusterHolder` interface, because of flink compatibility at Flink 1.9 - Renamed: `FlinkStreamingProcessRegistrar` to `FlinkProcessManager` * [#1303](https://github.com/TouK/nussknacker/pull/1303) TypedObjectTypingResult has a new field: additionalInfo -* Various naming changes: - * [#1917](https://github.com/TouK/nussknacker/pull/1917) configuration of `engineConfig` to `deploymentConfig` - * [#1927](https://github.com/TouK/nussknacker/pull/1927) Rename `outer-join` to `single-side-join` ## In version 0.2.0 diff --git a/docs/operations_guide/Operations.md b/docs/operations_guide/Operations.md index 9abc08b9810..a4177361bcb 100644 --- a/docs/operations_guide/Operations.md +++ b/docs/operations_guide/Operations.md @@ -71,7 +71,7 @@ It’s possible to configure Nussknacker installation to use other metrics setup ### Common Flink configuration issues -Nussknacker assumes that the Flink Session Cluster is used (it should be possible to write own, custom `ProcessManager` to deploy with Job/Application mode, +Nussknacker assumes that the Flink Session Cluster is used (it should be possible to write own, custom `DeploymentManager` to deploy with Job/Application mode, but this is out of scope of this guide). It usually happens (especially for large deployments) that the Flink cluster used with Nusssknacker has quite a lot of jobs (each representing one scenario), many of them are quite small in terms of needed resources - this is different to usual Flink setup, where a cluster has one or few jobs. diff --git a/engine/api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessManager.scala b/engine/api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala similarity index 96% rename from engine/api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessManager.scala rename to engine/api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala index 2b7c872255b..f228dcb7857 100644 --- a/engine/api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessManager.scala +++ b/engine/api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala @@ -6,7 +6,7 @@ import pl.touk.nussknacker.engine.api.process.ProcessName import scala.concurrent.Future -trait ProcessManager extends AutoCloseable { +trait DeploymentManager extends AutoCloseable { //TODO: savepointPath is very flink specific, how can we handle that differently? def deploy(processVersion: ProcessVersion, deploymentData: DeploymentData, processDeploymentData: ProcessDeploymentData, savepointPath: Option[String]) : Future[Option[ExternalDeploymentId]] diff --git a/engine/flink/generic/src/test/scala/pl/touk/nussknacker/genericmodel/RunGenericModelLocally.scala b/engine/flink/generic/src/test/scala/pl/touk/nussknacker/genericmodel/RunGenericModelLocally.scala index b9e26cbfa87..f948cdc6d72 100644 --- a/engine/flink/generic/src/test/scala/pl/touk/nussknacker/genericmodel/RunGenericModelLocally.scala +++ b/engine/flink/generic/src/test/scala/pl/touk/nussknacker/genericmodel/RunGenericModelLocally.scala @@ -1,8 +1,8 @@ package pl.touk.nussknacker.genericmodel import com.typesafe.config.ConfigFactory -import pl.touk.nussknacker.engine.ProcessManagerProvider -import pl.touk.nussknacker.engine.testing.{LocalModelData, ProcessManagerProviderStub} +import pl.touk.nussknacker.engine.DeploymentManagerProvider +import pl.touk.nussknacker.engine.testing.{LocalModelData, DeploymentManagerProviderStub} import pl.touk.nussknacker.ui.util.LocalNussknackerWithSingleModel //Sample app to simplify local development. @@ -13,7 +13,7 @@ object RunGenericModelLocally extends App { val managerConfig = ConfigFactory.empty() //For simplicity we use stub here, one can add real Flink implementation after add appropriate dependencies - val provider: ProcessManagerProvider = new ProcessManagerProviderStub + val provider: DeploymentManagerProvider = new DeploymentManagerProviderStub LocalNussknackerWithSingleModel.run(modelData, provider, managerConfig, Set("Default")) } diff --git a/engine/flink/management/periodic/README.md b/engine/flink/management/periodic/README.md index f78b1aa30ab..465baf337a4 100644 --- a/engine/flink/management/periodic/README.md +++ b/engine/flink/management/periodic/README.md @@ -10,10 +10,10 @@ process is scheduled to be run again according to the schedule. ## Usage -- Implement `ProcessManagerProvider` using `PeriodicProcessManagerProvider`. Following components need to provided: +- Implement `DeploymentManagerProvider` using `PeriodicDeploymentManagerProvider`. Following components need to provided: - Underlying engine, currently only Flink is supported. - Optional `SchedulePropertyExtractor` to determine how to construct an instance of a periodic property. By default a cron expression set in process properties is used to describe when a process should be run. - Optional `EnrichDeploymentWithJarDataFactory` if you would like to, for example, extend process configuration, by default nothing is done. -- Add service provider with your `ProcessManagerProvider` implementation. +- Add service provider with your `DeploymentManagerProvider` implementation. diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessManager.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala similarity index 96% rename from engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessManager.scala rename to engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala index a92ec47e27f..14f7f4f5b91 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessManager.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala @@ -23,8 +23,8 @@ import sttp.client.{NothingT, SttpBackend} import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future} -object PeriodicProcessManager { - def apply(delegate: ProcessManager, +object PeriodicDeploymentManager { + def apply(delegate: DeploymentManager, schedulePropertyExtractor: SchedulePropertyExtractor, enrichDeploymentWithJarDataFactory: EnrichDeploymentWithJarDataFactory, periodicBatchConfig: PeriodicBatchConfig, @@ -32,7 +32,7 @@ object PeriodicProcessManager { originalConfig: Config, modelData: ModelData, listenerFactory: PeriodicProcessListenerFactory, - additionalDeploymentDataProvider: AdditionalDeploymentDataProvider): PeriodicProcessManager = { + additionalDeploymentDataProvider: AdditionalDeploymentDataProvider): PeriodicDeploymentManager = { implicit val system: ActorSystem = ActorSystem("periodic-process-manager-provider") implicit val ec: ExecutionContext = ExecutionContext.global implicit val backend: SttpBackend[Future, Nothing, NothingT] = AsyncHttpClientFutureBackend.usingConfigBuilder { builder => @@ -56,15 +56,15 @@ object PeriodicProcessManager { Await.ready(backend.close(), 10 seconds) () } - new PeriodicProcessManager(delegate, service, schedulePropertyExtractor, toClose) + new PeriodicDeploymentManager(delegate, service, schedulePropertyExtractor, toClose) } } -class PeriodicProcessManager(val delegate: ProcessManager, +class PeriodicDeploymentManager(val delegate: DeploymentManager, service: PeriodicProcessService, schedulePropertyExtractor: SchedulePropertyExtractor, toClose: () => Unit) - (implicit val ec: ExecutionContext) extends ProcessManager with LazyLogging { + (implicit val ec: ExecutionContext) extends DeploymentManager with LazyLogging { override def deploy(processVersion: ProcessVersion, deploymentData: DeploymentData, diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessManagerProvider.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerProvider.scala similarity index 76% rename from engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessManagerProvider.scala rename to engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerProvider.scala index a6ebc2799de..3ca28e90066 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessManagerProvider.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerProvider.scala @@ -3,32 +3,32 @@ package pl.touk.nussknacker.engine.management.periodic import com.typesafe.config.Config import com.typesafe.scalalogging.LazyLogging import pl.touk.nussknacker.engine.api.TypeSpecificData -import pl.touk.nussknacker.engine.api.deployment.ProcessManager +import pl.touk.nussknacker.engine.api.deployment.DeploymentManager import pl.touk.nussknacker.engine.api.queryablestate.QueryableClient import pl.touk.nussknacker.engine.management.FlinkConfig import pl.touk.nussknacker.engine.management.periodic.service.{AdditionalDeploymentDataProvider, DefaultAdditionalDeploymentDataProvider, EmptyListener, EmptyPeriodicProcessListenerFactory, PeriodicProcessListener, PeriodicProcessListenerFactory} import pl.touk.nussknacker.engine.util.config.ConfigEnrichments.RichConfig -import pl.touk.nussknacker.engine.{ModelData, ProcessManagerProvider} +import pl.touk.nussknacker.engine.{ModelData, DeploymentManagerProvider} -class PeriodicProcessManagerProvider(delegate: ProcessManagerProvider, +class PeriodicDeploymentManagerProvider(delegate: DeploymentManagerProvider, schedulePropertyExtractor: SchedulePropertyExtractor = CronSchedulePropertyExtractor(), enrichDeploymentWithJarDataFactory: EnrichDeploymentWithJarDataFactory = EnrichDeploymentWithJarDataFactory.noOp, listenerFactory: PeriodicProcessListenerFactory = EmptyPeriodicProcessListenerFactory, additionalDeploymentDataProvider: AdditionalDeploymentDataProvider = DefaultAdditionalDeploymentDataProvider - ) extends ProcessManagerProvider with LazyLogging { + ) extends DeploymentManagerProvider with LazyLogging { override def name: String = s"${delegate.name}Periodic" - override def createProcessManager(modelData: ModelData, config: Config): ProcessManager = { + override def createDeploymentManager(modelData: ModelData, config: Config): DeploymentManager = { logger.info("Creating periodic scenario manager") - val delegateProcessManager = delegate.createProcessManager(modelData, config) + val delegateDeploymentManager = delegate.createDeploymentManager(modelData, config) import net.ceedubs.ficus.Ficus._ import net.ceedubs.ficus.readers.ArbitraryTypeReader._ - val periodicBatchConfig = config.as[PeriodicBatchConfig]("processManager") + val periodicBatchConfig = config.as[PeriodicBatchConfig]("deploymentManager") val flinkConfig = config.rootAs[FlinkConfig] - PeriodicProcessManager( - delegate = delegateProcessManager, + PeriodicDeploymentManager( + delegate = delegateDeploymentManager, schedulePropertyExtractor = schedulePropertyExtractor, enrichDeploymentWithJarDataFactory = enrichDeploymentWithJarDataFactory, periodicBatchConfig = periodicBatchConfig, diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala index 2b2ed4fdb0b..a84815415be 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala @@ -15,7 +15,7 @@ import java.time.{Clock, LocalDateTime} import scala.concurrent.{ExecutionContext, Future} import scala.util.control.NonFatal -class PeriodicProcessService(delegateProcessManager: ProcessManager, +class PeriodicProcessService(delegateDeploymentManager: DeploymentManager, jarManager: JarManager, scheduledProcessesRepository: PeriodicProcessesRepository, periodicProcessListener: PeriodicProcessListener, @@ -91,7 +91,7 @@ class PeriodicProcessService(delegateProcessManager: ProcessManager, //Currently we don't allow simultaneous runs of one scenario - only sequential, so if other schedule kicks in, it'll have to wait private def checkIfNotRunning(toDeploy: PeriodicProcessDeployment): Future[Option[PeriodicProcessDeployment]] = { - delegateProcessManager.findJobStatus(toDeploy.periodicProcess.processVersion.processName).map { + delegateDeploymentManager.findJobStatus(toDeploy.periodicProcess.processVersion.processName).map { case Some(state) if state.isDeployed => logger.debug(s"Deferring run of ${toDeploy.display} as scenario is currently running") None @@ -102,7 +102,7 @@ class PeriodicProcessService(delegateProcessManager: ProcessManager, def handleFinished: Future[Unit] = { def handleSingleProcess(deployedProcess: PeriodicProcessDeployment): Future[Unit] = { - delegateProcessManager.findJobStatus(deployedProcess.periodicProcess.processVersion.processName).flatMap { state => + delegateDeploymentManager.findJobStatus(deployedProcess.periodicProcess.processVersion.processName).flatMap { state => handleFinishedAction(deployedProcess, state) .flatMap { needsReschedule => if (needsReschedule) reschedule(deployedProcess, state) else scheduledProcessesRepository.monad.pure(()).emptyCallback @@ -170,7 +170,7 @@ class PeriodicProcessService(delegateProcessManager: ProcessManager, } def deactivate(processName: ProcessName): Future[Unit] = for { - status <- delegateProcessManager.findJobStatus(processName) + status <- delegateDeploymentManager.findJobStatus(processName) maybePeriodicDeployment <- getLatestDeployment(processName) actionResult <- maybePeriodicDeployment match { case Some(periodicDeployment) => handleFinishedAction(periodicDeployment, status) diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/flink/FlinkJarManager.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/flink/FlinkJarManager.scala index 384e4e50365..74b9d72b135 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/flink/FlinkJarManager.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/flink/FlinkJarManager.scala @@ -7,7 +7,7 @@ import pl.touk.nussknacker.engine.api.deployment.{DeploymentData, ExternalDeploy import pl.touk.nussknacker.engine.management.periodic.model.DeploymentWithJarData import pl.touk.nussknacker.engine.management.periodic.{EnrichDeploymentWithJarData, JarManager, PeriodicBatchConfig, model} import pl.touk.nussknacker.engine.management.rest.{FlinkClient, HttpFlinkClient} -import pl.touk.nussknacker.engine.management.{FlinkConfig, FlinkModelJar, FlinkProcessManager, FlinkStreamingRestManager} +import pl.touk.nussknacker.engine.management.{FlinkConfig, FlinkModelJar, FlinkDeploymentManager, FlinkStreamingRestManager} import pl.touk.nussknacker.engine.modelconfig.InputConfigDuringExecution import sttp.client.{NothingT, SttpBackend} @@ -70,7 +70,7 @@ private[periodic] class FlinkJarManager(flinkClient: FlinkClient, val processVersion = deploymentWithJarData.processVersion logger.info(s"Deploying scenario ${processVersion.processName.value}, version id: ${processVersion.versionId} and jar: ${deploymentWithJarData.jarFileName}") val jarFile = jarsDir.resolve(deploymentWithJarData.jarFileName).toFile - val args = FlinkProcessManager.prepareProgramArgs(deploymentWithJarData.modelConfig, + val args = FlinkDeploymentManager.prepareProgramArgs(deploymentWithJarData.modelConfig, processVersion, deploymentData, GraphProcess(deploymentWithJarData.processJson)) diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/ProcessManagerStub.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/DeploymentManagerStub.scala similarity index 97% rename from engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/ProcessManagerStub.scala rename to engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/DeploymentManagerStub.scala index 00116900ebf..7420756d3f6 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/ProcessManagerStub.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/DeploymentManagerStub.scala @@ -7,7 +7,7 @@ import pl.touk.nussknacker.engine.management.FlinkProcessStateDefinitionManager import scala.concurrent.Future -class ProcessManagerStub extends ProcessManager { +class DeploymentManagerStub extends DeploymentManager { var jobStatus: Option[ProcessState] = None diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessManagerTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala similarity index 70% rename from engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessManagerTest.scala rename to engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala index b9b8de865ad..56d72cb3f6c 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessManagerTest.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala @@ -16,7 +16,7 @@ import pl.touk.nussknacker.test.PatientScalaFutures import java.time.Clock import scala.concurrent.Await -class PeriodicProcessManagerTest extends FunSuite +class PeriodicDeploymentManagerTest extends FunSuite with Matchers with ScalaFutures with OptionValues @@ -33,29 +33,29 @@ class PeriodicProcessManagerTest extends FunSuite class Fixture { val repository = new db.InMemPeriodicProcessesRepository - val delegateProcessManagerStub = new ProcessManagerStub + val delegateDeploymentManagerStub = new DeploymentManagerStub val jarManagerStub = new JarManagerStub val periodicProcessService = new PeriodicProcessService( - delegateProcessManager = delegateProcessManagerStub, + delegateDeploymentManager = delegateDeploymentManagerStub, jarManager = jarManagerStub, scheduledProcessesRepository = repository, EmptyListener, DefaultAdditionalDeploymentDataProvider, Clock.systemDefaultZone() ) - val periodicProcessManager = new PeriodicProcessManager( - delegate = delegateProcessManagerStub, + val periodicDeploymentManager = new PeriodicDeploymentManager( + delegate = delegateDeploymentManagerStub, service = periodicProcessService, schedulePropertyExtractor = CronSchedulePropertyExtractor(), toClose = () => () ) - def getAllowedActions: List[ProcessActionType] = periodicProcessManager.findJobStatus(processName).futureValue.value.allowedActions + def getAllowedActions: List[ProcessActionType] = periodicDeploymentManager.findJobStatus(processName).futureValue.value.allowedActions } test("findJobStatus - should return none for no job") { val f = new Fixture - val state = f.periodicProcessManager.findJobStatus(processName).futureValue + val state = f.periodicDeploymentManager.findJobStatus(processName).futureValue state shouldBe 'empty } @@ -64,7 +64,7 @@ class PeriodicProcessManagerTest extends FunSuite val f = new Fixture f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Scheduled) - val state = f.periodicProcessManager.findJobStatus(processName).futureValue + val state = f.periodicDeploymentManager.findJobStatus(processName).futureValue val status = state.value.status status shouldBe a[ScheduledStatus] @@ -75,9 +75,9 @@ class PeriodicProcessManagerTest extends FunSuite test("findJobStatus - should be scheduled when scenario scheduled and job finished on Flink") { val f = new Fixture f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Scheduled) - f.delegateProcessManagerStub.setStateStatus(FlinkStateStatus.Finished) + f.delegateDeploymentManagerStub.setStateStatus(FlinkStateStatus.Finished) - val state = f.periodicProcessManager.findJobStatus(processName).futureValue + val state = f.periodicDeploymentManager.findJobStatus(processName).futureValue val status = state.value.status status shouldBe a[ScheduledStatus] @@ -87,9 +87,9 @@ class PeriodicProcessManagerTest extends FunSuite test("findJobStatus - should be running when scenario deployed and job running on Flink") { val f = new Fixture f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed) - f.delegateProcessManagerStub.setStateStatus(FlinkStateStatus.Running) + f.delegateDeploymentManagerStub.setStateStatus(FlinkStateStatus.Running) - val state = f.periodicProcessManager.findJobStatus(processName).futureValue + val state = f.periodicDeploymentManager.findJobStatus(processName).futureValue val status = state.value.status status shouldBe FlinkStateStatus.Running @@ -99,9 +99,9 @@ class PeriodicProcessManagerTest extends FunSuite test("findJobStatus - should be waiting for reschedule if job finished on Flink but scenario is still deployed") { val f = new Fixture f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed) - f.delegateProcessManagerStub.setStateStatus(FlinkStateStatus.Finished) + f.delegateDeploymentManagerStub.setStateStatus(FlinkStateStatus.Finished) - val state = f.periodicProcessManager.findJobStatus(processName).futureValue + val state = f.periodicDeploymentManager.findJobStatus(processName).futureValue val status = state.value.status status shouldBe WaitingForScheduleStatus @@ -112,7 +112,7 @@ class PeriodicProcessManagerTest extends FunSuite val f = new Fixture f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Failed) - val state = f.periodicProcessManager.findJobStatus(processName).futureValue + val state = f.periodicDeploymentManager.findJobStatus(processName).futureValue val status = state.value.status status shouldBe SimpleStateStatus.Failed @@ -122,7 +122,7 @@ class PeriodicProcessManagerTest extends FunSuite test("deploy - should fail for custom scenario") { val f = new Fixture - val deploymentResult = f.periodicProcessManager.deploy(processVersion, DeploymentData.empty, CustomProcess("test"), None) + val deploymentResult = f.periodicDeploymentManager.deploy(processVersion, DeploymentData.empty, CustomProcess("test"), None) intercept[PeriodicProcessException](Await.result(deploymentResult, patienceConfig.timeout)) } @@ -130,7 +130,7 @@ class PeriodicProcessManagerTest extends FunSuite test("deploy - should fail for invalid periodic property") { val f = new Fixture - val deploymentResult = f.periodicProcessManager.deploy(processVersion, DeploymentData.empty, GraphProcess("broken"), None) + val deploymentResult = f.periodicDeploymentManager.deploy(processVersion, DeploymentData.empty, GraphProcess("broken"), None) intercept[PeriodicProcessException](Await.result(deploymentResult, patienceConfig.timeout)) } @@ -138,7 +138,7 @@ class PeriodicProcessManagerTest extends FunSuite test("deploy - should schedule periodic scenario") { val f = new Fixture - f.periodicProcessManager.deploy(processVersion, DeploymentData.empty, PeriodicProcessGen(), None).futureValue + f.periodicDeploymentManager.deploy(processVersion, DeploymentData.empty, PeriodicProcessGen(), None).futureValue f.repository.processEntities.loneElement.active shouldBe true f.repository.deploymentEntities.loneElement.status shouldBe PeriodicProcessDeploymentStatus.Scheduled @@ -148,7 +148,7 @@ class PeriodicProcessManagerTest extends FunSuite val f = new Fixture f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Scheduled) - f.periodicProcessManager.deploy(processVersion, DeploymentData.empty, PeriodicProcessGen(), None).futureValue + f.periodicDeploymentManager.deploy(processVersion, DeploymentData.empty, PeriodicProcessGen(), None).futureValue f.repository.processEntities should have size 2 f.repository.processEntities.map(_.active) shouldBe List(false, true) @@ -157,9 +157,9 @@ class PeriodicProcessManagerTest extends FunSuite test("should get status of failed job") { val f = new Fixture f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed) - f.delegateProcessManagerStub.setStateStatus(FlinkStateStatus.Failed) + f.delegateDeploymentManagerStub.setStateStatus(FlinkStateStatus.Failed) - val state = f.periodicProcessManager.findJobStatus(processName).futureValue + val state = f.periodicDeploymentManager.findJobStatus(processName).futureValue val status = state.value.status status shouldBe SimpleStateStatus.Failed @@ -169,16 +169,16 @@ class PeriodicProcessManagerTest extends FunSuite test("should redeploy failed scenario") { val f = new Fixture f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed) - f.delegateProcessManagerStub.setStateStatus(FlinkStateStatus.Failed) - val failedProcessState = f.periodicProcessManager.findJobStatus(processName).futureValue.value + f.delegateDeploymentManagerStub.setStateStatus(FlinkStateStatus.Failed) + val failedProcessState = f.periodicDeploymentManager.findJobStatus(processName).futureValue.value failedProcessState.status shouldBe FlinkStateStatus.Failed failedProcessState.allowedActions shouldBe List(ProcessActionType.Cancel) // redeploy is blocked in GUI but API allows it - f.periodicProcessManager.deploy(processVersion, DeploymentData.empty, PeriodicProcessGen(), None).futureValue + f.periodicDeploymentManager.deploy(processVersion, DeploymentData.empty, PeriodicProcessGen(), None).futureValue f.repository.processEntities.map(_.active) shouldBe List(false, true) f.repository.deploymentEntities.map(_.status) shouldBe List(PeriodicProcessDeploymentStatus.Failed, PeriodicProcessDeploymentStatus.Scheduled) - val scheduledProcessState = f.periodicProcessManager.findJobStatus(processName).futureValue.value + val scheduledProcessState = f.periodicDeploymentManager.findJobStatus(processName).futureValue.value // Previous job is still visible as Failed. scheduledProcessState.status shouldBe a[ScheduledStatus] scheduledProcessState.allowedActions shouldBe List(ProcessActionType.Cancel, ProcessActionType.Deploy) @@ -189,7 +189,7 @@ class PeriodicProcessManagerTest extends FunSuite f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Scheduled) f.getAllowedActions shouldBe List(ProcessActionType.Cancel, ProcessActionType.Deploy) - f.periodicProcessManager.deploy(processVersion, DeploymentData.empty, PeriodicProcessGen(), None).futureValue + f.periodicDeploymentManager.deploy(processVersion, DeploymentData.empty, PeriodicProcessGen(), None).futureValue f.repository.processEntities.map(_.active) shouldBe List(false, true) f.repository.deploymentEntities.map(_.status) shouldBe List(PeriodicProcessDeploymentStatus.Scheduled, PeriodicProcessDeploymentStatus.Scheduled) @@ -198,10 +198,10 @@ class PeriodicProcessManagerTest extends FunSuite test("should redeploy running scenario") { val f = new Fixture f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed) - f.delegateProcessManagerStub.setStateStatus(FlinkStateStatus.Running) + f.delegateDeploymentManagerStub.setStateStatus(FlinkStateStatus.Running) f.getAllowedActions shouldBe List(ProcessActionType.Cancel) // redeploy is blocked in GUI but API allows it - f.periodicProcessManager.deploy(processVersion, DeploymentData.empty, PeriodicProcessGen(), None).futureValue + f.periodicDeploymentManager.deploy(processVersion, DeploymentData.empty, PeriodicProcessGen(), None).futureValue f.repository.processEntities.map(_.active) shouldBe List(false, true) f.repository.deploymentEntities.map(_.status) shouldBe List(PeriodicProcessDeploymentStatus.Deployed, PeriodicProcessDeploymentStatus.Scheduled) @@ -210,10 +210,10 @@ class PeriodicProcessManagerTest extends FunSuite test("should redeploy finished scenario") { val f = new Fixture f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed) - f.delegateProcessManagerStub.setStateStatus(FlinkStateStatus.Finished) + f.delegateDeploymentManagerStub.setStateStatus(FlinkStateStatus.Finished) f.getAllowedActions shouldBe List(ProcessActionType.Cancel) // redeploy is blocked in GUI but API allows it - f.periodicProcessManager.deploy(processVersion, DeploymentData.empty, PeriodicProcessGen(), None).futureValue + f.periodicDeploymentManager.deploy(processVersion, DeploymentData.empty, PeriodicProcessGen(), None).futureValue f.repository.processEntities.map(_.active) shouldBe List(false, true) f.repository.deploymentEntities.map(_.status) shouldBe List(PeriodicProcessDeploymentStatus.Finished, PeriodicProcessDeploymentStatus.Scheduled) @@ -222,53 +222,53 @@ class PeriodicProcessManagerTest extends FunSuite test("should cancel failed job after RescheduleActor handles finished") { val f = new Fixture f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed) - f.delegateProcessManagerStub.setStateStatus(FlinkStateStatus.Failed) + f.delegateDeploymentManagerStub.setStateStatus(FlinkStateStatus.Failed) //this one is cyclically called by RescheduleActor f.periodicProcessService.handleFinished.futureValue - f.periodicProcessManager.findJobStatus(processName).futureValue.get.status shouldBe SimpleStateStatus.Failed + f.periodicDeploymentManager.findJobStatus(processName).futureValue.get.status shouldBe SimpleStateStatus.Failed f.repository.deploymentEntities.loneElement.status shouldBe PeriodicProcessDeploymentStatus.Failed f.repository.processEntities.loneElement.active shouldBe true - f.periodicProcessManager.cancel(processName, User("test", "Tester")).futureValue + f.periodicDeploymentManager.cancel(processName, User("test", "Tester")).futureValue f.repository.processEntities.loneElement.active shouldBe false f.repository.deploymentEntities.loneElement.status shouldBe PeriodicProcessDeploymentStatus.Failed - f.periodicProcessManager.findJobStatus(processName).futureValue.get.status shouldBe SimpleStateStatus.Canceled + f.periodicDeploymentManager.findJobStatus(processName).futureValue.get.status shouldBe SimpleStateStatus.Canceled } test("should cancel failed job before RescheduleActor handles finished") { val f = new Fixture f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed) - f.delegateProcessManagerStub.setStateStatus(FlinkStateStatus.Failed) + f.delegateDeploymentManagerStub.setStateStatus(FlinkStateStatus.Failed) - f.periodicProcessManager.cancel(processName, User("test", "Tester")).futureValue + f.periodicDeploymentManager.cancel(processName, User("test", "Tester")).futureValue f.repository.processEntities.loneElement.active shouldBe false f.repository.deploymentEntities.loneElement.status shouldBe PeriodicProcessDeploymentStatus.Failed - f.periodicProcessManager.findJobStatus(processName).futureValue.get.status shouldBe SimpleStateStatus.Canceled + f.periodicDeploymentManager.findJobStatus(processName).futureValue.get.status shouldBe SimpleStateStatus.Canceled } test("should cancel failed scenario after disappeared from Flink console") { val f = new Fixture f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed) - f.delegateProcessManagerStub.setStateStatus(FlinkStateStatus.Failed) + f.delegateDeploymentManagerStub.setStateStatus(FlinkStateStatus.Failed) //this one is cyclically called by RescheduleActor f.periodicProcessService.handleFinished.futureValue //after some time Flink stops returning job status - f.delegateProcessManagerStub.jobStatus = None + f.delegateDeploymentManagerStub.jobStatus = None - f.periodicProcessManager.findJobStatus(processName).futureValue.get.status shouldBe SimpleStateStatus.Failed + f.periodicDeploymentManager.findJobStatus(processName).futureValue.get.status shouldBe SimpleStateStatus.Failed f.repository.deploymentEntities.loneElement.status shouldBe PeriodicProcessDeploymentStatus.Failed f.repository.processEntities.loneElement.active shouldBe true - f.periodicProcessManager.cancel(processName, User("test", "Tester")).futureValue + f.periodicDeploymentManager.cancel(processName, User("test", "Tester")).futureValue f.repository.processEntities.loneElement.active shouldBe false f.repository.deploymentEntities.loneElement.status shouldBe PeriodicProcessDeploymentStatus.Failed - f.periodicProcessManager.findJobStatus(processName).futureValue shouldBe None + f.periodicDeploymentManager.findJobStatus(processName).futureValue shouldBe None } } diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala index 4c6b7f9857c..bb970448426 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala @@ -39,12 +39,12 @@ class PeriodicProcessServiceIntegrationTest extends FunSuite class Fixture { val hsqlRepo: HsqlProcessRepository = HsqlProcessRepository.prepare - val delegateProcessManagerStub = new ProcessManagerStub + val delegateDeploymentManagerStub = new DeploymentManagerStub val jarManagerStub = new JarManagerStub val events = new ArrayBuffer[PeriodicProcessEvent]() var failListener = false def periodicProcessService(currentTime: Instant) = new PeriodicProcessService( - delegateProcessManager = delegateProcessManagerStub, + delegateDeploymentManager = delegateDeploymentManagerStub, jarManager = jarManagerStub, scheduledProcessesRepository = hsqlRepo.forClock(fixedClock(currentTime)), new PeriodicProcessListener { @@ -155,12 +155,12 @@ class PeriodicProcessServiceIntegrationTest extends FunSuite toDeploy should have length 2 service.deploy(toDeploy.head) - f.delegateProcessManagerStub.setStateStatus(RunningStateStatus("running")) + f.delegateDeploymentManagerStub.setStateStatus(RunningStateStatus("running")) val toDeployAfterDeploy = service.findToBeDeployed.futureValue toDeployAfterDeploy should have length 0 - f.delegateProcessManagerStub.setStateStatus(FinishedStateStatus("finished")) + f.delegateDeploymentManagerStub.setStateStatus(FinishedStateStatus("finished")) service.handleFinished.futureValue val toDeployAfterFinish = service.findToBeDeployed.futureValue @@ -192,7 +192,7 @@ class PeriodicProcessServiceIntegrationTest extends FunSuite val toDeploy = service.findToBeDeployed.futureValue toDeploy should have length 1 service.deploy(toDeploy.head).futureValue - f.delegateProcessManagerStub.setStateStatus(FinishedStateStatus("running")) + f.delegateDeploymentManagerStub.setStateStatus(FinishedStateStatus("running")) tryWithFailedListener { () => service.deactivate(processName) diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala index ad101cf6718..54c91837efc 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala @@ -29,12 +29,12 @@ class PeriodicProcessServiceTest extends FunSuite class Fixture { val repository = new db.InMemPeriodicProcessesRepository - val delegateProcessManagerStub = new ProcessManagerStub + val delegateDeploymentManagerStub = new DeploymentManagerStub val jarManagerStub = new JarManagerStub val events = new ArrayBuffer[PeriodicProcessEvent]() val additionalData = Map("testMap" -> "testValue") val periodicProcessService = new PeriodicProcessService( - delegateProcessManager = delegateProcessManagerStub, + delegateDeploymentManager = delegateDeploymentManagerStub, jarManager = jarManagerStub, scheduledProcessesRepository = repository, new PeriodicProcessListener { @@ -66,7 +66,7 @@ class PeriodicProcessServiceTest extends FunSuite test("handleFinished - should reschedule for finished Flink job") { val f = new Fixture f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed) - f.delegateProcessManagerStub.setStateStatus(FlinkStateStatus.Finished) + f.delegateDeploymentManagerStub.setStateStatus(FlinkStateStatus.Finished) f.periodicProcessService.handleFinished.futureValue @@ -76,7 +76,7 @@ class PeriodicProcessServiceTest extends FunSuite f.repository.deploymentEntities.map(_.status) shouldBe List(PeriodicProcessDeploymentStatus.Finished, PeriodicProcessDeploymentStatus.Scheduled) val finished :: scheduled :: Nil = f.repository.deploymentEntities.map(createPeriodicProcessDeployment(processEntity, _)).toList - f.events.toList shouldBe List(FinishedEvent(finished, f.delegateProcessManagerStub.jobStatus), ScheduledEvent(scheduled, firstSchedule = false)) + f.events.toList shouldBe List(FinishedEvent(finished, f.delegateDeploymentManagerStub.jobStatus), ScheduledEvent(scheduled, firstSchedule = false)) } test("handle first schedule") { @@ -96,7 +96,7 @@ class PeriodicProcessServiceTest extends FunSuite test("handleFinished - should mark as failed for failed Flink job") { val f = new Fixture f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed) - f.delegateProcessManagerStub.setStateStatus(FlinkStateStatus.Failed) + f.delegateDeploymentManagerStub.setStateStatus(FlinkStateStatus.Failed) f.periodicProcessService.handleFinished.futureValue @@ -106,7 +106,7 @@ class PeriodicProcessServiceTest extends FunSuite f.repository.deploymentEntities.loneElement.status shouldBe PeriodicProcessDeploymentStatus.Failed val expectedDetails = createPeriodicProcessDeployment(processEntity, f.repository.deploymentEntities.head) - f.events.toList shouldBe List(FailedEvent(expectedDetails, f.delegateProcessManagerStub.jobStatus)) + f.events.toList shouldBe List(FailedEvent(expectedDetails, f.delegateDeploymentManagerStub.jobStatus)) } test("deploy - should deploy and mark as so") { diff --git a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingProcessManagerSpec.scala b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingDeploymentManagerSpec.scala similarity index 85% rename from engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingProcessManagerSpec.scala rename to engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingDeploymentManagerSpec.scala index f4d411b41aa..fe919a498a8 100644 --- a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingProcessManagerSpec.scala +++ b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingDeploymentManagerSpec.scala @@ -22,7 +22,7 @@ import pl.touk.nussknacker.engine.util.config.ScalaMajorVersionConfig import scala.concurrent.duration._ //TODO: get rid of at least some Thread.sleep -class FlinkStreamingProcessManagerSpec extends FunSuite with Matchers with StreamingDockerTest { +class FlinkStreamingDeploymentManagerSpec extends FunSuite with Matchers with StreamingDockerTest { import pl.touk.nussknacker.engine.kafka.KafkaZookeeperUtils._ @@ -51,7 +51,7 @@ class FlinkStreamingProcessManagerSpec extends FunSuite with Matchers with Strea val marshaled = ProcessMarshaller.toJson(ProcessCanonizer.canonize(process)).spaces2 val version = ProcessVersion(15, ProcessName(processId), "user1", Some(13)) - val deployedResponse = processManager.deploy(version, defaultDeploymentData, GraphProcess(marshaled), None) + val deployedResponse = deploymentManager.deploy(version, defaultDeploymentData, GraphProcess(marshaled), None) assert(deployedResponse.isReadyWithin(70 seconds)) } @@ -73,7 +73,7 @@ class FlinkStreamingProcessManagerSpec extends FunSuite with Matchers with Strea //this is for the case where e.g. we manually cancel flink job, or it fail and didn't restart... test("cancel of not existing job should not fail") { - processManager.cancel(ProcessName("not existing job"), user = userToAct).futureValue shouldBe (()) + deploymentManager.cancel(ProcessName("not existing job"), user = userToAct).futureValue shouldBe (()) } test("be able verify&redeploy kafka scenario") { @@ -103,7 +103,7 @@ class FlinkStreamingProcessManagerSpec extends FunSuite with Matchers with Strea messagesFromTopic(outTopic, 2).last shouldBe "2" - assert(processManager.cancel(ProcessName(kafkaProcess.id), user = userToAct).isReadyWithin(10 seconds)) + assert(deploymentManager.cancel(ProcessName(kafkaProcess.id), user = userToAct).isReadyWithin(10 seconds)) } // TODO: unignore - currently quite often fail during second deployProcessAndWaitIfRunning @@ -142,7 +142,7 @@ class FlinkStreamingProcessManagerSpec extends FunSuite with Matchers with Strea messagesFromTopic(outTopic, 1) shouldBe List("List(One element)") val savepointDir = Files.createTempDirectory("customSavepoint") - val savepointPathFuture = processManager.savepoint(ProcessName(processEmittingOneElementAfterStart.id), savepointDir = Some(savepointDir.toUri.toString)) + val savepointPathFuture = deploymentManager.savepoint(ProcessName(processEmittingOneElementAfterStart.id), savepointDir = Some(savepointDir.toUri.toString)) .map(_.path) assert(savepointPathFuture.isReadyWithin(10 seconds)) val savepointPath = new URI(savepointPathFuture.futureValue) @@ -167,10 +167,10 @@ class FlinkStreamingProcessManagerSpec extends FunSuite with Matchers with Strea deployProcessAndWaitIfRunning(processEmittingOneElementAfterStart, empty(processId)) messagesFromTopic(outTopic, 1) shouldBe List("List(One element)") - val savepointPath = processManager.stop(ProcessName(processId), savepointDir = None, user = userToAct).map(_.path) + val savepointPath = deploymentManager.stop(ProcessName(processId), savepointDir = None, user = userToAct).map(_.path) eventually { - val status = processManager.findJobStatus(ProcessName(processId)).futureValue + val status = deploymentManager.findJobStatus(ProcessName(processId)).futureValue status.map(_.status) shouldBe Some(FlinkStateStatus.Finished) } @@ -195,7 +195,7 @@ class FlinkStreamingProcessManagerSpec extends FunSuite with Matchers with Strea logger.info("Starting to redeploy") val newMarshalled = ProcessMarshaller.toJson(ProcessCanonizer.canonize(StatefulSampleProcess.prepareProcessWithLongState(processId))).spaces2 - val exception = processManager.deploy(empty(process.id), defaultDeploymentData, GraphProcess(newMarshalled), None).failed.futureValue + val exception = deploymentManager.deploy(empty(process.id), defaultDeploymentData, GraphProcess(newMarshalled), None).failed.futureValue exception.getMessage shouldBe "State is incompatible, please stop scenario and start again with clean state" @@ -216,7 +216,7 @@ class FlinkStreamingProcessManagerSpec extends FunSuite with Matchers with Strea logger.info("Starting to redeploy") val newMarshalled = ProcessMarshaller.toJson(ProcessCanonizer.canonize(StatefulSampleProcess.processWithMapAggegator(processId, "#AGG.approxCardinality"))).spaces2 - val exception = processManager.deploy(empty(process.id), defaultDeploymentData, GraphProcess(newMarshalled), None).failed.futureValue + val exception = deploymentManager.deploy(empty(process.id), defaultDeploymentData, GraphProcess(newMarshalled), None).failed.futureValue exception.getMessage shouldBe "State is incompatible, please stop scenario and start again with clean state" @@ -228,9 +228,9 @@ class FlinkStreamingProcessManagerSpec extends FunSuite with Matchers with Strea test("deploy custom scenario") { val processId = "customProcess" - assert(processManager.deploy(empty(processId), defaultDeploymentData, CustomProcess("pl.touk.nussknacker.engine.management.sample.CustomProcess"), None).isReadyWithin(100 seconds)) + assert(deploymentManager.deploy(empty(processId), defaultDeploymentData, CustomProcess("pl.touk.nussknacker.engine.management.sample.CustomProcess"), None).isReadyWithin(100 seconds)) - val jobStatus = processManager.findJobStatus(ProcessName(processId)).futureValue + val jobStatus = deploymentManager.findJobStatus(ProcessName(processId)).futureValue jobStatus.map(_.status.name) shouldBe Some(FlinkStateStatus.Running.name) jobStatus.map(_.status.isRunning) shouldBe Some(true) @@ -267,10 +267,10 @@ class FlinkStreamingProcessManagerSpec extends FunSuite with Matchers with Strea private def deployProcessAndWaitIfRunning(process: EspProcess, processVersion: ProcessVersion, savepointPath : Option[String] = None) = { val marshaled = ProcessMarshaller.toJson(ProcessCanonizer.canonize(process)).spaces2 - assert(processManager.deploy(processVersion, defaultDeploymentData, GraphProcess(marshaled), savepointPath).isReadyWithin(100 seconds)) + assert(deploymentManager.deploy(processVersion, defaultDeploymentData, GraphProcess(marshaled), savepointPath).isReadyWithin(100 seconds)) eventually { - val jobStatus = processManager.findJobStatus(ProcessName(process.id)).futureValue + val jobStatus = deploymentManager.findJobStatus(ProcessName(process.id)).futureValue logger.debug(s"Waiting for deploy: ${process.id}, $jobStatus") jobStatus.map(_.status.name) shouldBe Some(FlinkStateStatus.Running.name) @@ -279,9 +279,9 @@ class FlinkStreamingProcessManagerSpec extends FunSuite with Matchers with Strea } private def cancel(processId: String): Unit = { - assert(processManager.cancel(ProcessName(processId), user = userToAct).isReadyWithin(10 seconds)) + assert(deploymentManager.cancel(ProcessName(processId), user = userToAct).isReadyWithin(10 seconds)) eventually { - val runningJobs = processManager + val runningJobs = deploymentManager .findJobStatus(ProcessName(processId)) .futureValue .filter(_.status.isRunning) @@ -294,5 +294,5 @@ class FlinkStreamingProcessManagerSpec extends FunSuite with Matchers with Strea } private def processVersion(processId: ProcessName): Option[ProcessVersion] = - processManager.findJobStatus(processId).futureValue.flatMap(_.version) + deploymentManager.findJobStatus(processId).futureValue.flatMap(_.version) } diff --git a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingProcessTestRunnerSpec.scala b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingProcessTestRunnerSpec.scala index 7e07663b9f3..9ae4cb2bcab 100644 --- a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingProcessTestRunnerSpec.scala +++ b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/FlinkStreamingProcessTestRunnerSpec.scala @@ -8,7 +8,7 @@ import pl.touk.nussknacker.engine.api.deployment.TestProcess.{NodeResult, Result import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.build.EspProcessBuilder import pl.touk.nussknacker.engine.canonize.ProcessCanonizer -import pl.touk.nussknacker.engine.management.FlinkStreamingProcessManagerProvider +import pl.touk.nussknacker.engine.management.FlinkStreamingDeploymentManagerProvider import pl.touk.nussknacker.engine.marshall.ProcessMarshaller import pl.touk.nussknacker.engine.util.config.ScalaMajorVersionConfig import pl.touk.nussknacker.test.VeryPatientScalaFutures @@ -24,14 +24,14 @@ class FlinkStreamingProcessTestRunnerSpec extends FlatSpec with Matchers with Ve .withValue("modelConfig.classPath", ConfigValueFactory.fromIterable(Collections.singletonList(classPath))) it should "run scenario in test mode" in { - val processManager = FlinkStreamingProcessManagerProvider.defaultProcessManager(config) + val deploymentManager = FlinkStreamingDeploymentManagerProvider.defaultDeploymentManager(config) val processId = UUID.randomUUID().toString val process = SampleProcess.prepareProcess(processId) val processData = ProcessMarshaller.toJson(ProcessCanonizer.canonize(process)).spaces2 - whenReady(processManager.test(ProcessName(processId), processData, TestData("terefere"), identity)) { r => + whenReady(deploymentManager.test(ProcessName(processId), processData, TestData("terefere"), identity)) { r => r.nodeResults shouldBe Map( "startProcess" -> List(NodeResult(ResultContext(s"$processId-startProcess-0-0", Map("input" -> "terefere")))), "nightFilter" -> List(NodeResult(ResultContext(s"$processId-startProcess-0-0", Map("input" -> "terefere")))), @@ -49,13 +49,13 @@ class FlinkStreamingProcessTestRunnerSpec extends FlatSpec with Matchers with Ve .source("startProcess", "kafka-transaction") .emptySink("endSend", "sendSmsNotExist") - val processManager = FlinkStreamingProcessManagerProvider.defaultProcessManager(config) + val deploymentManager = FlinkStreamingDeploymentManagerProvider.defaultDeploymentManager(config) val processData = ProcessMarshaller.toJson(ProcessCanonizer.canonize(process)).spaces2 val caught = intercept[IllegalArgumentException] { - Await.result(processManager.test(ProcessName(processId), processData, TestData("terefere"), _ => null), patienceConfig.timeout) + Await.result(deploymentManager.test(ProcessName(processId), processData, TestData("terefere"), _ => null), patienceConfig.timeout) } caught.getMessage shouldBe "Compilation errors: MissingSinkFactory(sendSmsNotExist,endSend)" } diff --git a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/JavaConfigProcessManagerSpec.scala b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/JavaConfigDeploymentManagerSpec.scala similarity index 77% rename from engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/JavaConfigProcessManagerSpec.scala rename to engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/JavaConfigDeploymentManagerSpec.scala index ba33c9a2951..1d4bde90e2b 100644 --- a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/JavaConfigProcessManagerSpec.scala +++ b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/JavaConfigDeploymentManagerSpec.scala @@ -12,7 +12,7 @@ import pl.touk.nussknacker.engine.util.config.ScalaMajorVersionConfig import scala.concurrent.duration._ -class JavaConfigProcessManagerSpec extends FunSuite with Matchers with StreamingDockerTest { +class JavaConfigDeploymentManagerSpec extends FunSuite with Matchers with StreamingDockerTest { override protected def classPath: String = s"./engine/flink/management/java_sample/target/scala-${ScalaMajorVersionConfig.scalaMajorVersion}/managementJavaSample.jar" @@ -26,13 +26,13 @@ class JavaConfigProcessManagerSpec extends FunSuite with Matchers with Streaming .emptySink("endSend", "sink") val marshaled = ProcessMarshaller.toJson(ProcessCanonizer.canonize(process)).spaces2 - assert(processManager.deploy(ProcessVersion.empty.copy(processName=ProcessName(process.id)), DeploymentData.empty, + assert(deploymentManager.deploy(ProcessVersion.empty.copy(processName=ProcessName(process.id)), DeploymentData.empty, GraphProcess(marshaled), None).isReadyWithin(100 seconds)) Thread.sleep(1000) - val jobStatus = processManager.findJobStatus(ProcessName(process.id)).futureValue + val jobStatus = deploymentManager.findJobStatus(ProcessName(process.id)).futureValue jobStatus.map(_.status.name) shouldBe Some(FlinkStateStatus.Running.name) jobStatus.map(_.status.isRunning) shouldBe Some(true) - assert(processManager.cancel(ProcessName(process.id), user = userToAct).isReadyWithin(10 seconds)) + assert(deploymentManager.cancel(ProcessName(process.id), user = userToAct).isReadyWithin(10 seconds)) } } diff --git a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/StreamingDockerTest.scala b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/StreamingDockerTest.scala index 2fa5efd0d2c..7c329175247 100644 --- a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/StreamingDockerTest.scala +++ b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/StreamingDockerTest.scala @@ -4,9 +4,9 @@ import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} import com.typesafe.config.ConfigValueFactory.fromAnyRef import com.whisk.docker.{ContainerLink, DockerContainer, DockerReadyChecker} import org.scalatest.Suite -import pl.touk.nussknacker.engine.api.deployment.ProcessManager +import pl.touk.nussknacker.engine.api.deployment.DeploymentManager import pl.touk.nussknacker.engine.kafka.KafkaClient -import pl.touk.nussknacker.engine.management.{DockerTest, FlinkStreamingProcessManagerProvider} +import pl.touk.nussknacker.engine.management.{DockerTest, FlinkStreamingDeploymentManagerProvider} import pl.touk.nussknacker.engine.util.config.ScalaMajorVersionConfig import scala.concurrent.duration._ @@ -49,6 +49,6 @@ trait StreamingDockerTest extends DockerTest { self: Suite => private def kafkaAddress = s"${ipOfContainer(kafkaContainer)}:$KafkaPort" - protected lazy val processManager: ProcessManager = FlinkStreamingProcessManagerProvider.defaultProcessManager(config) + protected lazy val deploymentManager: DeploymentManager = FlinkStreamingDeploymentManagerProvider.defaultDeploymentManager(config) } diff --git a/engine/flink/management/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.DeploymentManagerProvider b/engine/flink/management/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.DeploymentManagerProvider new file mode 100644 index 00000000000..a98ccd750e2 --- /dev/null +++ b/engine/flink/management/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.DeploymentManagerProvider @@ -0,0 +1 @@ +pl.touk.nussknacker.engine.management.FlinkStreamingDeploymentManagerProvider \ No newline at end of file diff --git a/engine/flink/management/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.ProcessManagerProvider b/engine/flink/management/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.ProcessManagerProvider deleted file mode 100644 index 9b4bcbb859e..00000000000 --- a/engine/flink/management/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.ProcessManagerProvider +++ /dev/null @@ -1 +0,0 @@ -pl.touk.nussknacker.engine.management.FlinkStreamingProcessManagerProvider \ No newline at end of file diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessManager.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala similarity index 95% rename from engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessManager.scala rename to engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala index b810e7d1ea7..8a3a522505c 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkProcessManager.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala @@ -7,12 +7,12 @@ import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.api.deployment.TestProcess.{TestData, TestResults} import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.process.ProcessName -import pl.touk.nussknacker.engine.management.FlinkProcessManager.prepareProgramArgs +import pl.touk.nussknacker.engine.management.FlinkDeploymentManager.prepareProgramArgs import scala.concurrent.{ExecutionContext, Future} -abstract class FlinkProcessManager(modelData: ModelData, shouldVerifyBeforeDeploy: Boolean, mainClassName: String) - extends ProcessManager with LazyLogging { +abstract class FlinkDeploymentManager(modelData: ModelData, shouldVerifyBeforeDeploy: Boolean, mainClassName: String) + extends DeploymentManager with LazyLogging { private implicit val ec: ExecutionContext = ExecutionContext.Implicits.global @@ -118,7 +118,7 @@ abstract class FlinkProcessManager(modelData: ModelData, shouldVerifyBeforeDeplo override def processStateDefinitionManager: ProcessStateDefinitionManager = FlinkProcessStateDefinitionManager } -object FlinkProcessManager { +object FlinkDeploymentManager { def prepareProgramArgs(serializedConfig: String, processVersion: ProcessVersion, diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala index 4396d73237c..c870cf1ea37 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala @@ -20,7 +20,7 @@ import scala.concurrent.{Await, Future} class FlinkRestManager(config: FlinkConfig, modelData: ModelData, mainClassName: String) (implicit backend: SttpBackend[Future, Nothing, NothingT]) - extends FlinkProcessManager(modelData, config.shouldVerifyBeforeDeploy, mainClassName) with LazyLogging { + extends FlinkDeploymentManager(modelData, config.shouldVerifyBeforeDeploy, mainClassName) with LazyLogging { protected lazy val jarFile: File = new FlinkModelJar().buildJobJar(modelData) diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingProcessManagerProvider.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingDeploymentManagerProvider.scala similarity index 71% rename from engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingProcessManagerProvider.scala rename to engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingDeploymentManagerProvider.scala index f71c23b2541..561b2a087f5 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingProcessManagerProvider.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingDeploymentManagerProvider.scala @@ -3,9 +3,9 @@ package pl.touk.nussknacker.engine.management import com.typesafe.config.Config import org.asynchttpclient.DefaultAsyncHttpClientConfig import pl.touk.nussknacker.engine.ModelData.ClasspathConfig -import pl.touk.nussknacker.engine.{ModelData, ProcessManagerProvider, ProcessingTypeConfig} +import pl.touk.nussknacker.engine.{ModelData, DeploymentManagerProvider, ProcessingTypeConfig} import pl.touk.nussknacker.engine.api.{StreamMetaData, TypeSpecificData} -import pl.touk.nussknacker.engine.api.deployment.ProcessManager +import pl.touk.nussknacker.engine.api.deployment.DeploymentManager import pl.touk.nussknacker.engine.flink.queryablestate.FlinkQueryableClient import pl.touk.nussknacker.engine.api.queryablestate.QueryableClient import sttp.client.{NothingT, SttpBackend} @@ -13,13 +13,13 @@ import sttp.client.asynchttpclient.future.AsyncHttpClientFutureBackend import scala.concurrent.Future -class FlinkStreamingProcessManagerProvider extends ProcessManagerProvider { +class FlinkStreamingDeploymentManagerProvider extends DeploymentManagerProvider { import net.ceedubs.ficus.readers.ArbitraryTypeReader._ import net.ceedubs.ficus.Ficus._ import pl.touk.nussknacker.engine.util.config.ConfigEnrichments._ - override def createProcessManager(modelData: ModelData, config: Config): ProcessManager = { + override def createDeploymentManager(modelData: ModelData, config: Config): DeploymentManager = { implicit val backend: SttpBackend[Future, Nothing, NothingT] = AsyncHttpClientFutureBackend.usingConfig(new DefaultAsyncHttpClientConfig.Builder().build()) val flinkConfig = config.rootAs[FlinkConfig] @@ -39,10 +39,10 @@ class FlinkStreamingProcessManagerProvider extends ProcessManagerProvider { override def supportsSignals: Boolean = true } -object FlinkStreamingProcessManagerProvider { +object FlinkStreamingDeploymentManagerProvider { - def defaultProcessManager(config: Config): ProcessManager = { + def defaultDeploymentManager(config: Config): DeploymentManager = { val typeConfig = ProcessingTypeConfig.read(config) - new FlinkStreamingProcessManagerProvider().createProcessManager(typeConfig.toModelData, typeConfig.deploymentConfig) + new FlinkStreamingDeploymentManagerProvider().createDeploymentManager(typeConfig.toModelData, typeConfig.deploymentConfig) } } \ No newline at end of file diff --git a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala index 066a9d12ccd..639bd9f983a 100644 --- a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala +++ b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala @@ -100,7 +100,7 @@ class FlinkRestManagerSpec extends FunSuite with Matchers with PatientScalaFutur (manager, history) } - def processState(manager: FlinkProcessManager, + def processState(manager: FlinkDeploymentManager, deploymentId: ExternalDeploymentId, status: StateStatus, version: Option[ProcessVersion] = Option.empty, diff --git a/engine/interpreter/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.DeploymentManagerProvider b/engine/interpreter/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.DeploymentManagerProvider new file mode 100644 index 00000000000..44e3196e79e --- /dev/null +++ b/engine/interpreter/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.DeploymentManagerProvider @@ -0,0 +1 @@ +pl.touk.nussknacker.engine.testing.DeploymentManagerProviderStub \ No newline at end of file diff --git a/engine/interpreter/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.ProcessManagerProvider b/engine/interpreter/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.ProcessManagerProvider deleted file mode 100644 index b385f397971..00000000000 --- a/engine/interpreter/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.ProcessManagerProvider +++ /dev/null @@ -1 +0,0 @@ -pl.touk.nussknacker.engine.testing.ProcessManagerProviderStub \ No newline at end of file diff --git a/engine/interpreter/src/main/scala/pl/touk/nussknacker/engine/ProcessManagerProvider.scala b/engine/interpreter/src/main/scala/pl/touk/nussknacker/engine/DeploymentManagerProvider.scala similarity index 66% rename from engine/interpreter/src/main/scala/pl/touk/nussknacker/engine/ProcessManagerProvider.scala rename to engine/interpreter/src/main/scala/pl/touk/nussknacker/engine/DeploymentManagerProvider.scala index d5d50aea7b9..62fd01fd106 100644 --- a/engine/interpreter/src/main/scala/pl/touk/nussknacker/engine/ProcessManagerProvider.scala +++ b/engine/interpreter/src/main/scala/pl/touk/nussknacker/engine/DeploymentManagerProvider.scala @@ -4,14 +4,14 @@ import java.net.URL import com.typesafe.config.Config import net.ceedubs.ficus.readers.ValueReader import pl.touk.nussknacker.engine.api.{NamedServiceProvider, TypeSpecificData} -import pl.touk.nussknacker.engine.api.deployment.ProcessManager +import pl.touk.nussknacker.engine.api.deployment.DeploymentManager import pl.touk.nussknacker.engine.api.queryablestate.QueryableClient import pl.touk.nussknacker.engine.util.loader.ModelClassLoader -trait ProcessManagerProvider extends NamedServiceProvider { +trait DeploymentManagerProvider extends NamedServiceProvider { - def createProcessManager(modelData: ModelData, config: Config): ProcessManager + def createDeploymentManager(modelData: ModelData, config: Config): DeploymentManager def createQueryableClient(config: Config): Option[QueryableClient] @@ -20,7 +20,7 @@ trait ProcessManagerProvider extends NamedServiceProvider { def supportsSignals: Boolean } -case class ProcessingTypeData(processManager: ProcessManager, +case class ProcessingTypeData(deploymentManager: DeploymentManager, modelData: ModelData, emptyProcessCreate: Boolean => TypeSpecificData, queryableClient: Option[QueryableClient], @@ -28,7 +28,7 @@ case class ProcessingTypeData(processManager: ProcessManager, def close(): Unit = { modelData.close() - processManager.close() + deploymentManager.close() queryableClient.foreach(_.close()) } @@ -62,20 +62,20 @@ object ProcessingTypeData { type ProcessingType = String - def createProcessingTypeData(processManagerProvider: ProcessManagerProvider, modelData: ModelData, managerConfig: Config): ProcessingTypeData = { - val manager = processManagerProvider.createProcessManager(modelData, managerConfig) - val queryableClient = processManagerProvider.createQueryableClient(managerConfig) + def createProcessingTypeData(deploymentManagerProvider: DeploymentManagerProvider, modelData: ModelData, managerConfig: Config): ProcessingTypeData = { + val manager = deploymentManagerProvider.createDeploymentManager(modelData, managerConfig) + val queryableClient = deploymentManagerProvider.createQueryableClient(managerConfig) ProcessingTypeData( manager, modelData, - processManagerProvider.emptyProcessMetadata, + deploymentManagerProvider.emptyProcessMetadata, queryableClient, - processManagerProvider.supportsSignals) + deploymentManagerProvider.supportsSignals) } - def createProcessingTypeData(processManagerProvider: ProcessManagerProvider, processTypeConfig: ProcessingTypeConfig): ProcessingTypeData = { + def createProcessingTypeData(deploymentManagerProvider: DeploymentManagerProvider, processTypeConfig: ProcessingTypeConfig): ProcessingTypeData = { val modelData = processTypeConfig.toModelData val managerConfig = processTypeConfig.deploymentConfig - createProcessingTypeData(processManagerProvider, modelData, managerConfig) + createProcessingTypeData(deploymentManagerProvider, modelData, managerConfig) } } diff --git a/engine/interpreter/src/main/scala/pl/touk/nussknacker/engine/testing/ProcessManagerStub.scala b/engine/interpreter/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala similarity index 84% rename from engine/interpreter/src/main/scala/pl/touk/nussknacker/engine/testing/ProcessManagerStub.scala rename to engine/interpreter/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala index 0d43fe0fde9..61902fb50e9 100644 --- a/engine/interpreter/src/main/scala/pl/touk/nussknacker/engine/testing/ProcessManagerStub.scala +++ b/engine/interpreter/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala @@ -3,14 +3,14 @@ package pl.touk.nussknacker.engine.testing import com.typesafe.config.Config import pl.touk.nussknacker.engine.api.deployment.simple.SimpleProcessStateDefinitionManager import pl.touk.nussknacker.engine.api.{ProcessVersion, StreamMetaData, TypeSpecificData} -import pl.touk.nussknacker.engine.api.deployment.{CustomAction, CustomActionError, CustomActionNotImplemented, CustomActionRequest, CustomActionResult, DeploymentData, ExternalDeploymentId, ProcessDeploymentData, ProcessManager, ProcessState, ProcessStateDefinitionManager, SavepointResult, TestProcess, User} +import pl.touk.nussknacker.engine.api.deployment.{CustomAction, CustomActionError, CustomActionNotImplemented, CustomActionRequest, CustomActionResult, DeploymentData, ExternalDeploymentId, ProcessDeploymentData, DeploymentManager, ProcessState, ProcessStateDefinitionManager, SavepointResult, TestProcess, User} import pl.touk.nussknacker.engine.api.process.ProcessName -import pl.touk.nussknacker.engine.{ModelData, ProcessManagerProvider} +import pl.touk.nussknacker.engine.{ModelData, DeploymentManagerProvider} import pl.touk.nussknacker.engine.api.queryablestate.QueryableClient import scala.concurrent.Future -class ProcessManagerStub extends ProcessManager { +class DeploymentManagerStub extends DeploymentManager { override def deploy(processVersion: ProcessVersion, deploymentData: DeploymentData, processDeploymentData: ProcessDeploymentData, savepointPath: Option[String]): Future[Option[ExternalDeploymentId]] = @@ -40,9 +40,9 @@ class ProcessManagerStub extends ProcessManager { //This provider can be used for testing. Override methods to implement more complex behaviour //Provider is registered via ServiceLoader, so it can be used e.g. to run simple docker configuration -class ProcessManagerProviderStub extends ProcessManagerProvider { +class DeploymentManagerProviderStub extends DeploymentManagerProvider { - override def createProcessManager(modelData: ModelData, config: Config): ProcessManager = new ProcessManagerStub + override def createDeploymentManager(modelData: ModelData, config: Config): DeploymentManager = new DeploymentManagerStub override def createQueryableClient(config: Config): Option[QueryableClient] = None diff --git a/engine/standalone/engine/src/it/scala/pl/touk/nussknacker/engine/standalone/management/StandaloneProcessManagerSpec.scala b/engine/standalone/engine/src/it/scala/pl/touk/nussknacker/engine/standalone/management/StandaloneDeploymentManagerSpec.scala similarity index 88% rename from engine/standalone/engine/src/it/scala/pl/touk/nussknacker/engine/standalone/management/StandaloneProcessManagerSpec.scala rename to engine/standalone/engine/src/it/scala/pl/touk/nussknacker/engine/standalone/management/StandaloneDeploymentManagerSpec.scala index c2e67bec0b1..9aa25a3fcef 100644 --- a/engine/standalone/engine/src/it/scala/pl/touk/nussknacker/engine/standalone/management/StandaloneProcessManagerSpec.scala +++ b/engine/standalone/engine/src/it/scala/pl/touk/nussknacker/engine/standalone/management/StandaloneDeploymentManagerSpec.scala @@ -17,15 +17,15 @@ import pl.touk.nussknacker.engine.marshall.ProcessMarshaller import pl.touk.nussknacker.engine.util.config.ScalaMajorVersionConfig import pl.touk.nussknacker.test.VeryPatientScalaFutures -class StandaloneProcessManagerSpec extends FunSuite with VeryPatientScalaFutures with Matchers { +class StandaloneDeploymentManagerSpec extends FunSuite with VeryPatientScalaFutures with Matchers { test("it should parse test data and test standalone process") { val config = ScalaMajorVersionConfig.configWithScalaMajorVersion(ConfigFactory.parseResources("standalone.conf")) - val modelData = StandaloneProcessManagerProvider + val modelData = StandaloneDeploymentManagerProvider .defaultTypeConfig(config).toModelData - val manager = new StandaloneProcessManager(modelData, null) + val manager = new StandaloneDeploymentManager(modelData, null) val process = ProcessMarshaller.toJson(ProcessCanonizer.canonize(EspProcessBuilder .id("") diff --git a/engine/standalone/engine/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.DeploymentManagerProvider b/engine/standalone/engine/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.DeploymentManagerProvider new file mode 100644 index 00000000000..d5516462011 --- /dev/null +++ b/engine/standalone/engine/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.DeploymentManagerProvider @@ -0,0 +1 @@ +pl.touk.nussknacker.engine.standalone.management.StandaloneDeploymentManagerProvider \ No newline at end of file diff --git a/engine/standalone/engine/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.ProcessManagerProvider b/engine/standalone/engine/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.ProcessManagerProvider deleted file mode 100644 index 5a5ec1a8ee8..00000000000 --- a/engine/standalone/engine/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.ProcessManagerProvider +++ /dev/null @@ -1 +0,0 @@ -pl.touk.nussknacker.engine.standalone.management.StandaloneProcessManagerProvider \ No newline at end of file diff --git a/engine/standalone/engine/src/main/scala/pl/touk/nussknacker/engine/standalone/deployment/DeploymentService.scala b/engine/standalone/engine/src/main/scala/pl/touk/nussknacker/engine/standalone/deployment/DeploymentService.scala index f85a33d8db3..51faf768918 100644 --- a/engine/standalone/engine/src/main/scala/pl/touk/nussknacker/engine/standalone/deployment/DeploymentService.scala +++ b/engine/standalone/engine/src/main/scala/pl/touk/nussknacker/engine/standalone/deployment/DeploymentService.scala @@ -16,7 +16,7 @@ import pl.touk.nussknacker.engine.marshall.{ProcessMarshaller, ProcessUnmarshall import pl.touk.nussknacker.engine.resultcollector.ProductionServiceInvocationCollector import pl.touk.nussknacker.engine.standalone.StandaloneProcessInterpreter import pl.touk.nussknacker.engine.standalone.api.{StandaloneContextPreparer, StandaloneDeploymentData} -import pl.touk.nussknacker.engine.standalone.management.StandaloneProcessManagerProvider +import pl.touk.nussknacker.engine.standalone.management.StandaloneDeploymentManagerProvider import scala.concurrent.ExecutionContext @@ -25,7 +25,7 @@ object DeploymentService { //TODO this is temporary solution, we should keep these processes e.g. in ZK //also: how to pass model data around? def apply(context: StandaloneContextPreparer, config: Config): DeploymentService = { - val modelData = StandaloneProcessManagerProvider.defaultTypeConfig(config).toModelData + val modelData = StandaloneDeploymentManagerProvider.defaultTypeConfig(config).toModelData new DeploymentService(context, modelData, FileProcessRepository(config.getString("standaloneEngineProcessLocation"))) } diff --git a/engine/standalone/engine/src/main/scala/pl/touk/nussknacker/engine/standalone/management/StandaloneProcessManager.scala b/engine/standalone/engine/src/main/scala/pl/touk/nussknacker/engine/standalone/management/StandaloneDeploymentManager.scala similarity index 94% rename from engine/standalone/engine/src/main/scala/pl/touk/nussknacker/engine/standalone/management/StandaloneProcessManager.scala rename to engine/standalone/engine/src/main/scala/pl/touk/nussknacker/engine/standalone/management/StandaloneDeploymentManager.scala index 77ceca2cfeb..3dfe7f42f2b 100644 --- a/engine/standalone/engine/src/main/scala/pl/touk/nussknacker/engine/standalone/management/StandaloneProcessManager.scala +++ b/engine/standalone/engine/src/main/scala/pl/touk/nussknacker/engine/standalone/management/StandaloneDeploymentManager.scala @@ -37,12 +37,12 @@ import scala.concurrent.duration.FiniteDuration import scala.concurrent.{Await, ExecutionContext, Future} import scala.util.Using -object StandaloneProcessManager { - def apply(modelData: ModelData, config: Config) : StandaloneProcessManager = new StandaloneProcessManager(modelData, StandaloneProcessClient(config)) +object StandaloneDeploymentManager { + def apply(modelData: ModelData, config: Config) : StandaloneDeploymentManager = new StandaloneDeploymentManager(modelData, StandaloneProcessClient(config)) } -class StandaloneProcessManager(modelData: ModelData, client: StandaloneProcessClient) - extends ProcessManager with LazyLogging { +class StandaloneDeploymentManager(modelData: ModelData, client: StandaloneProcessClient) + extends DeploymentManager with LazyLogging { private implicit val ec: ExecutionContext = ExecutionContext.Implicits.global @@ -210,10 +210,10 @@ object TestUtils { } -class StandaloneProcessManagerProvider extends ProcessManagerProvider { +class StandaloneDeploymentManagerProvider extends DeploymentManagerProvider { - override def createProcessManager(modelData: ModelData, config: Config): ProcessManager = - StandaloneProcessManager(modelData, config) + override def createDeploymentManager(modelData: ModelData, config: Config): DeploymentManager = + StandaloneDeploymentManager(modelData, config) override def createQueryableClient(config: Config): Option[QueryableClient] = None @@ -224,7 +224,7 @@ class StandaloneProcessManagerProvider extends ProcessManagerProvider { override def supportsSignals: Boolean = false } -object StandaloneProcessManagerProvider { +object StandaloneDeploymentManagerProvider { import net.ceedubs.ficus.readers.ArbitraryTypeReader._ import pl.touk.nussknacker.engine.util.config.CustomFicusInstances._ diff --git a/ui/README.md b/ui/README.md index b4029496322..f6a650f4fef 100644 --- a/ui/README.md +++ b/ui/README.md @@ -7,8 +7,8 @@ If you want to run UI to develop/debug your own model, please skip to last secti Before running either from console or from IDE you have to manually build: - run `npm ci` in `ui/client` (only if you want to test/compile FE, see `Readme.md` in `ui/client` for more details) -- custom models (```assemblySamples``` in sbt - not needed if running from IDE with stubbed ProcessManager, see below) -- ProcessManager(s) (```assemblyEngines``` in sbt - not needed if running from IDE with stubbed ProcessManager, see below) +- custom models (```assemblySamples``` in sbt - not needed if running from IDE with stubbed DeploymentManager, see below) +- DeploymentManager(s) (```assemblyDeploymentManagers``` in sbt - not needed if running from IDE with stubbed DeploymentManager, see below) - UI (```ui/assembly``` in sbt, not needed if you want to use FE development mode) You can do all steps at once with ```buildServer.sh``` script @@ -24,7 +24,7 @@ You can do all steps at once with ```buildServer.sh``` script If you want to connect to infrastructure in docker you need to set on end of line also: ```;FLINK_REST_URL=http://localhost:3031;FLINK_QUERYABLE_STATE_PROXY_URL=localhost:3063;SCHEMA_REGISTRY_URL=http://localhost:3082;KAFKA_ADDRESS=localhost:3032``` * Module classpath: nussknacker-ui (this is ```ui/server``` folder) - * "Included dependencies with "Provided" scope" should be checked, so that Flink ProcessManager is included in the classpath + * "Included dependencies with "Provided" scope" should be checked, so that Flink DeploymentManager is included in the classpath ## Running backend for frontend development If you want run backend only for front-end development, please run `./runServer.sh` @@ -36,7 +36,7 @@ If you want run backend only for front-end development, please run `./runServer. ## Access to service Service should be available at ~~http://localhost:8080/api~~ -# Running UI to develop/debug additional models with stubbed ProcessManager +# Running UI to develop/debug additional models with stubbed DeploymentManager If you want to run Nussknacker UI to see if your model behaves correctly (e.g. if dynamic parameters are OK), you don't have to follow steps above. You also don't have to build model jar and put it in diff --git a/ui/buildServer.sh b/ui/buildServer.sh index 95c60d022f8..297eb500e09 100755 --- a/ui/buildServer.sh +++ b/ui/buildServer.sh @@ -3,6 +3,6 @@ set -e cd .. -#assemblySamples is needed to use models, assemblyEngines - to access process managers, ui/assembly - to be able to use FE -./sbtwrapper ";assemblySamples;assemblyEngines;ui/assembly" +#assemblySamples is needed to use models, assemblyDeploymentManagers - to access process managers, ui/assembly - to be able to use FE +./sbtwrapper ";assemblySamples;assemblyDeploymentManagers;ui/assembly" cd - \ No newline at end of file diff --git a/ui/runServer.sh b/ui/runServer.sh index d8ec7ed7d48..7f30381ab77 100755 --- a/ui/runServer.sh +++ b/ui/runServer.sh @@ -8,7 +8,7 @@ cd $WORKING_DIR SCALA_VERSION=${SCALA_VERSION:-2.12} PROJECT_BASE_DIR="../../.." -#management jars are currently needed to access ProcessManagers +#management jars are currently needed to access DeploymentManagers FLINK_ENGINE_JAR=$PROJECT_BASE_DIR/engine/flink/management/target/scala-${SCALA_VERSION}/nussknacker-flink-manager.jar STANDALONE_ENGINE_JAR=$PROJECT_BASE_DIR/engine/standalone/engine/target/scala-${SCALA_VERSION}/nussknacker-standalone-manager.jar diff --git a/ui/server/src/main/scala/pl/touk/nussknacker/ui/NussknackerApp.scala b/ui/server/src/main/scala/pl/touk/nussknacker/ui/NussknackerApp.scala index 272b4c5a7f3..6a4ef31edcb 100644 --- a/ui/server/src/main/scala/pl/touk/nussknacker/ui/NussknackerApp.scala +++ b/ui/server/src/main/scala/pl/touk/nussknacker/ui/NussknackerApp.scala @@ -80,7 +80,7 @@ trait NusskanckerDefaultAppRouter extends NusskanckerAppRouter { val processCategoryService = new ConfigProcessCategoryService(config) - val managers = typeToConfig.mapValues(_.processManager) + val managers = typeToConfig.mapValues(_.deploymentManager) val subprocessRepository = new DbSubprocessRepository(dbConfig, system.dispatcher) val subprocessResolver = new SubprocessResolver(subprocessRepository) diff --git a/ui/server/src/main/scala/pl/touk/nussknacker/ui/api/DefinitionResources.scala b/ui/server/src/main/scala/pl/touk/nussknacker/ui/api/DefinitionResources.scala index cd7dfbcbd59..2581459cf25 100644 --- a/ui/server/src/main/scala/pl/touk/nussknacker/ui/api/DefinitionResources.scala +++ b/ui/server/src/main/scala/pl/touk/nussknacker/ui/api/DefinitionResources.scala @@ -48,7 +48,7 @@ class DefinitionResources(modelDataProvider: ProcessingTypeDataProvider[ModelDat complete( UIProcessObjectsFactory.prepareUIProcessObjects( processingTypeData.modelData, - processingTypeData.processManager, + processingTypeData.deploymentManager, user, subprocesses, isSubprocess, diff --git a/ui/server/src/main/scala/pl/touk/nussknacker/ui/api/ProcessesResources.scala b/ui/server/src/main/scala/pl/touk/nussknacker/ui/api/ProcessesResources.scala index 8572aa49c3a..064f9839460 100644 --- a/ui/server/src/main/scala/pl/touk/nussknacker/ui/api/ProcessesResources.scala +++ b/ui/server/src/main/scala/pl/touk/nussknacker/ui/api/ProcessesResources.scala @@ -8,7 +8,7 @@ import cats.data.Validated.{Invalid, Valid} import cats.instances.future._ import cats.data.Validated import cats.syntax.either._ -import pl.touk.nussknacker.engine.api.deployment.{ProcessManager, ProcessState} +import pl.touk.nussknacker.engine.api.deployment.{DeploymentManager, ProcessState} import pl.touk.nussknacker.ui.api.ProcessesResources.{UnmarshallError, WrongProcessId} import pl.touk.nussknacker.restmodel.displayedgraph.{DisplayableProcess, ProcessStatus, ValidatedDisplayableProcess} import pl.touk.nussknacker.ui.process.marshall.ProcessConverter @@ -330,13 +330,13 @@ class ProcessesResources( //This is temporary function to enriching process state data //TODO: Remove it when we will support cache for state private def enrichDetailsWithProcessState[PS: ProcessShapeFetchStrategy](process: BaseProcessDetails[PS]): BaseProcessDetails[PS] = - process.copy(state = processManager(process.processingType).map(m => ProcessStatus.createState( + process.copy(state = deploymentManager(process.processingType).map(m => ProcessStatus.createState( m.processStateDefinitionManager.mapActionToStatus(process.lastAction.map(_.action)), m.processStateDefinitionManager ))) - private def processManager(processingType: ProcessingType): Option[ProcessManager] = - typeToConfig.forType(processingType).map(_.processManager) + private def deploymentManager(processingType: ProcessingType): Option[DeploymentManager] = + typeToConfig.forType(processingType).map(_.deploymentManager) private def withJson(processId: ProcessId, version: Long, businessView: Boolean) (process: DisplayableProcess => ToResponseMarshallable)(implicit user: LoggedUser): ToResponseMarshallable diff --git a/ui/server/src/main/scala/pl/touk/nussknacker/ui/definition/UIProcessObjectsFactory.scala b/ui/server/src/main/scala/pl/touk/nussknacker/ui/definition/UIProcessObjectsFactory.scala index c8e429adcaf..d9d494f473d 100644 --- a/ui/server/src/main/scala/pl/touk/nussknacker/ui/definition/UIProcessObjectsFactory.scala +++ b/ui/server/src/main/scala/pl/touk/nussknacker/ui/definition/UIProcessObjectsFactory.scala @@ -4,7 +4,7 @@ import pl.touk.nussknacker.engine.ModelData import pl.touk.nussknacker.engine.api.MetaData import pl.touk.nussknacker.engine.api.async.{DefaultAsyncInterpretationValue, DefaultAsyncInterpretationValueDeterminer} import pl.touk.nussknacker.engine.api.definition.{Parameter, RawParameterEditor} -import pl.touk.nussknacker.engine.api.deployment.ProcessManager +import pl.touk.nussknacker.engine.api.deployment.DeploymentManager import pl.touk.nussknacker.engine.api.process.{AdditionalPropertyConfig, ParameterConfig, SingleNodeConfig} import pl.touk.nussknacker.engine.api.typed.typing.{Typed, Unknown} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess @@ -31,7 +31,7 @@ object UIProcessObjectsFactory { import pl.touk.nussknacker.engine.util.config.FicusReaders._ def prepareUIProcessObjects(modelDataForType: ModelData, - processManager: ProcessManager, + deploymentManager: DeploymentManager, user: LoggedUser, subprocessesDetails: Set[SubprocessDetails], isSubprocess: Boolean, @@ -85,7 +85,7 @@ object UIProcessObjectsFactory { processDefinition = chosenProcessDefinition, isSubprocess = isSubprocess, subprocessesDetails = subprocessesDetails), - customActions = processManager.customActions.map(UICustomAction(_)), + customActions = deploymentManager.customActions.map(UICustomAction(_)), defaultAsyncInterpretation = defaultAsyncInterpretation.value) } diff --git a/ui/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ManagementActor.scala b/ui/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ManagementActor.scala index fe7642e4793..54ad55c545b 100644 --- a/ui/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ManagementActor.scala +++ b/ui/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ManagementActor.scala @@ -30,7 +30,7 @@ import scala.util.control.NonFatal import scala.util.{Failure, Success} object ManagementActor { - def props(managers: ProcessingTypeDataProvider[ProcessManager], + def props(managers: ProcessingTypeDataProvider[DeploymentManager], processRepository: FetchingProcessRepository[Future], processActionRepository: DbProcessActionRepository, subprocessResolver: SubprocessResolver, @@ -40,7 +40,7 @@ object ManagementActor { } } -class ManagementActor(managers: ProcessingTypeDataProvider[ProcessManager], +class ManagementActor(managers: ProcessingTypeDataProvider[DeploymentManager], processRepository: FetchingProcessRepository[Future], deployedProcessRepository: DbProcessActionRepository, subprocessResolver: SubprocessResolver, @@ -57,20 +57,20 @@ class ManagementActor(managers: ProcessingTypeDataProvider[ProcessManager], reply(withDeploymentInfo(process, user, DeploymentActionType.Deployment, comment, deployRes)) } case Snapshot(id, user, savepointDir) => - reply(processManager(id.id)(ec, user).flatMap(_.savepoint(id.name, savepointDir))) + reply(deploymentManager(id.id)(ec, user).flatMap(_.savepoint(id.name, savepointDir))) case Stop(id, user, savepointDir) => - reply(processManager(id.id)(ec, user).flatMap(_.stop(id.name, savepointDir, toManagerUser(user)))) + reply(deploymentManager(id.id)(ec, user).flatMap(_.stop(id.name, savepointDir, toManagerUser(user)))) case Cancel(id, user, comment) => ensureNoDeploymentRunning { implicit val loggedUser: LoggedUser = user val cancelRes = cancelProcess(id, comment) reply(withDeploymentInfo(id, user, DeploymentActionType.Cancel, comment, cancelRes)) } - //TODO: should be handled in ProcessManager + //TODO: should be handled in DeploymentManager case CheckStatus(id, user) if isBeingDeployed(id.name) => implicit val loggedUser: LoggedUser = user val processStatus = for { - manager <- processManager(id.id) + manager <- deploymentManager(id.id) } yield ProcessStatus.createState( SimpleStateStatus.DuringDeploy, manager.processStateDefinitionManager @@ -94,7 +94,7 @@ class ManagementActor(managers: ProcessingTypeDataProvider[ProcessManager], ensureNoDeploymentRunning { implicit val loggedUser: LoggedUser = user val testAction = for { - manager <- processManager(id.id) + manager <- deploymentManager(id.id) resolvedProcess <- resolveGraph(processJson) testResult <- manager.test(id.name, resolvedProcess, testData, encoder) } yield testResult @@ -115,7 +115,7 @@ class ManagementActor(managers: ProcessingTypeDataProvider[ProcessManager], processVersion = processVersionData.toProcessVersion(id.name), user = toManagerUser(user), params = params) - processManager(id.id).flatMap { manager => + deploymentManager(id.id).flatMap { manager => manager.customActions.find(_.name == actionName) match { case Some(customAction) => getProcessStatus(id).flatMap(status => { @@ -138,20 +138,20 @@ class ManagementActor(managers: ProcessingTypeDataProvider[ProcessManager], private def getProcessStatus(processIdWithName: ProcessIdWithName)(implicit user: LoggedUser): Future[ProcessState] = for { actions <- processRepository.fetchProcessActions(processIdWithName.id) - manager <- processManager(processIdWithName.id) + manager <- deploymentManager(processIdWithName.id) state <- findJobState(manager, processIdWithName) _ <- handleFinishedProcess(processIdWithName, state) } yield handleObsoleteStatus(state, actions.headOption) - private def findJobState(processManager: ProcessManager, processIdWithName: ProcessIdWithName)(implicit user: LoggedUser): Future[Option[ProcessState]] = - processManager.findJobStatus(processIdWithName.name).recover { + private def findJobState(deploymentManager: DeploymentManager, processIdWithName: ProcessIdWithName)(implicit user: LoggedUser): Future[Option[ProcessState]] = + deploymentManager.findJobStatus(processIdWithName.name).recover { case NonFatal(e) => logger.warn(s"Failed to get status of ${processIdWithName.id}: ${e.getMessage}", e) Some(ProcessStatus.failedToGet) } //This method handles some corner cases like retention for keeping old states - some engine can cleanup canceled states. It's more Flink hermetic. - //TODO: In future we should move this functionality to ProcessManager. + //TODO: In future we should move this functionality to DeploymentManager. private def handleObsoleteStatus(processState: Option[ProcessState], lastAction: Option[ProcessAction]): ProcessState = (processState, lastAction) match { case (Some(state), _) if state.status.isFailed => state @@ -163,20 +163,20 @@ class ManagementActor(managers: ProcessingTypeDataProvider[ProcessManager], case (None, None) => ProcessStatus.simple(SimpleStateStatus.NotDeployed) } - //TODO: In future we should move this functionality to ProcessManager. + //TODO: In future we should move this functionality to DeploymentManager. private def handleState(state: ProcessState, lastAction: Option[ProcessAction]): ProcessState = state.status match { case SimpleStateStatus.NotDeployed if lastAction.isEmpty => ProcessStatus.simple(SimpleStateStatus.NotDeployed) //TODO: Should FlinkStateStatus.Restarting also be here?. Currently it's not handled to - //avoid dependency on FlinkProcessManager + //avoid dependency on FlinkDeploymentManager case SimpleStateStatus.DuringCancel | SimpleStateStatus.Finished if lastAction.isEmpty => ProcessStatus.simpleWarningProcessWithoutAction(Some(state)) case _ => state } //Thise method handles some corner cases for canceled process -> with last action = Canceled - //TODO: In future we should move this functionality to ProcessManager. + //TODO: In future we should move this functionality to DeploymentManager. private def handleCanceledState(processState: Option[ProcessState]): ProcessState = processState match { case Some(state) => state.status match { @@ -186,7 +186,7 @@ class ManagementActor(managers: ProcessingTypeDataProvider[ProcessManager], } //This method handles some corner cases for following deploy state mismatch last action version - //TODO: In future we should move this functionality to ProcessManager. + //TODO: In future we should move this functionality to DeploymentManager. private def handleFollowingDeployState(state: ProcessState, lastAction: Option[ProcessAction]): ProcessState = lastAction match { case Some(action) if !action.isDeployed => @@ -198,7 +198,7 @@ class ManagementActor(managers: ProcessingTypeDataProvider[ProcessManager], } //This method handles some corner cases for deployed action mismatch state version - //TODO: In future we should move this functionality to ProcessManager. + //TODO: In future we should move this functionality to DeploymentManager. private def handleMismatchDeployedLastAction(processState: Option[ProcessState], action: ProcessAction): ProcessState = processState match { case Some(state) => @@ -258,7 +258,7 @@ class ManagementActor(managers: ProcessingTypeDataProvider[ProcessManager], private def cancelProcess(processId: ProcessIdWithName, comment: Option[String])(implicit user: LoggedUser): Future[ProcessActionEntityData] = { for { - manager <- processManager(processId.id) + manager <- deploymentManager(processId.id) _ <- manager.cancel(processId.name, toManagerUser(user)) maybeVersion <- findDeployedVersion(processId) version <- maybeVersion match { @@ -291,7 +291,7 @@ class ManagementActor(managers: ProcessingTypeDataProvider[ProcessManager], savepointPath: Option[String], comment: Option[String])(implicit user: LoggedUser): Future[ProcessActionEntityData] = { val resolvedDeploymentData = resolveDeploymentData(latestVersion.deploymentData) - val processManagerValue = managers.forTypeUnsafe(processingType) + val deploymentManagerValue = managers.forTypeUnsafe(processingType) for { deploymentResolved <- resolvedDeploymentData @@ -299,7 +299,7 @@ class ManagementActor(managers: ProcessingTypeDataProvider[ProcessManager], processName = maybeProcessName.getOrElse(throw new IllegalArgumentException(s"Unknown scenario Id ${latestVersion.processId}")) //TODO: deploymentData = DeploymentData(DeploymentId(""), toManagerUser(user), Map.empty) - _ <- processManagerValue.deploy(latestVersion.toProcessVersion(processName), deploymentData, deploymentResolved, savepointPath) + _ <- deploymentManagerValue.deploy(latestVersion.toProcessVersion(processName), deploymentData, deploymentResolved, savepointPath) deployedActionData <- deployedProcessRepository.markProcessAsDeployed( ProcessId(latestVersion.processId), latestVersion.id, processingType, comment ) @@ -322,7 +322,7 @@ class ManagementActor(managers: ProcessingTypeDataProvider[ProcessManager], CatsSyntax.toFuture(validatedGraph)(e => new RuntimeException(e.head.toString)) } - private def processManager(processId: ProcessId)(implicit ec: ExecutionContext, user: LoggedUser): Future[ProcessManager] = { + private def deploymentManager(processId: ProcessId)(implicit ec: ExecutionContext, user: LoggedUser): Future[DeploymentManager] = { processRepository.fetchProcessingType(processId).map(managers.forTypeUnsafe) } diff --git a/ui/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtypedata/ProcessingTypeDataReader.scala b/ui/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtypedata/ProcessingTypeDataReader.scala index 57762dd06d3..875b558bd20 100644 --- a/ui/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtypedata/ProcessingTypeDataReader.scala +++ b/ui/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtypedata/ProcessingTypeDataReader.scala @@ -4,7 +4,7 @@ import com.typesafe.config.Config import com.typesafe.scalalogging.LazyLogging import pl.touk.nussknacker.engine.ProcessingTypeData.ProcessingType import pl.touk.nussknacker.engine.util.loader.ScalaServiceLoader -import pl.touk.nussknacker.engine.{ProcessManagerProvider, ProcessingTypeConfig, ProcessingTypeData} +import pl.touk.nussknacker.engine.{DeploymentManagerProvider, ProcessingTypeConfig, ProcessingTypeData} object ProcessingTypeDataReader extends LazyLogging { @@ -13,7 +13,7 @@ object ProcessingTypeDataReader extends LazyLogging { val valueMap = types.map { case (name, typeConfig) => logger.debug(s"Creating scenario manager: $name with config: $typeConfig") - val managerProvider = ScalaServiceLoader.loadNamed[ProcessManagerProvider](typeConfig.engineType) + val managerProvider = ScalaServiceLoader.loadNamed[DeploymentManagerProvider](typeConfig.engineType) name -> ProcessingTypeData.createProcessingTypeData(managerProvider, typeConfig) } new MapBasedProcessingTypeDataProvider[ProcessingTypeData](valueMap) diff --git a/ui/server/src/main/scala/pl/touk/nussknacker/ui/util/LocalNussknackerWithSingleModel.scala b/ui/server/src/main/scala/pl/touk/nussknacker/ui/util/LocalNussknackerWithSingleModel.scala index acb7925251b..bd32beaab09 100644 --- a/ui/server/src/main/scala/pl/touk/nussknacker/ui/util/LocalNussknackerWithSingleModel.scala +++ b/ui/server/src/main/scala/pl/touk/nussknacker/ui/util/LocalNussknackerWithSingleModel.scala @@ -3,7 +3,7 @@ package pl.touk.nussknacker.ui.util import com.typesafe.config.ConfigValueFactory._ import com.typesafe.config.{Config, ConfigFactory} import org.apache.commons.io.FileUtils -import pl.touk.nussknacker.engine.{ModelData, ProcessManagerProvider, ProcessingTypeData} +import pl.touk.nussknacker.engine.{ModelData, DeploymentManagerProvider, ProcessingTypeData} import pl.touk.nussknacker.ui.process.processingtypedata.{BasicProcessingTypeDataReload, MapBasedProcessingTypeDataProvider, ProcessingTypeDataProvider, ProcessingTypeDataReload} import pl.touk.nussknacker.ui.{NusskanckerDefaultAppRouter, NussknackerAppInitializer} @@ -19,13 +19,13 @@ object LocalNussknackerWithSingleModel { val typeName = "streaming" def run(modelData: ModelData, - processManagerProvider: ProcessManagerProvider, + deploymentManagerProvider: DeploymentManagerProvider, managerConfig: Config, categories: Set[String]): Unit = { val router = new NusskanckerDefaultAppRouter { override protected def prepareProcessingTypeData(config: Config): (ProcessingTypeDataProvider[ProcessingTypeData], ProcessingTypeDataReload) = { //TODO: figure out how to perform e.g. hotswap BasicProcessingTypeDataReload.wrapWithReloader(() => { - val data = ProcessingTypeData.createProcessingTypeData(processManagerProvider, modelData, managerConfig) + val data = ProcessingTypeData.createProcessingTypeData(deploymentManagerProvider, modelData, managerConfig) new MapBasedProcessingTypeDataProvider(Map(typeName -> data)) }) } diff --git a/ui/server/src/test/scala/pl/touk/nussknacker/ui/api/ManagementResourcesSpec.scala b/ui/server/src/test/scala/pl/touk/nussknacker/ui/api/ManagementResourcesSpec.scala index 3756dd9ba4c..81ebb7b54d4 100644 --- a/ui/server/src/test/scala/pl/touk/nussknacker/ui/api/ManagementResourcesSpec.scala +++ b/ui/server/src/test/scala/pl/touk/nussknacker/ui/api/ManagementResourcesSpec.scala @@ -62,7 +62,7 @@ class ManagementResourcesSpec extends FunSuite with ScalatestRouteTest with Fail test("process during deploy can't be deploy again") { createDeployedProcess(processName, testCategoryName, isSubprocess = false) - processManager.withProcessStateStatus(SimpleStateStatus.DuringDeploy) { + deploymentManager.withProcessStateStatus(SimpleStateStatus.DuringDeploy) { deployProcess(processName.value) ~> check { status shouldBe StatusCodes.Conflict } @@ -72,7 +72,7 @@ class ManagementResourcesSpec extends FunSuite with ScalatestRouteTest with Fail test("canceled process can't be canceled again") { createDeployedCanceledProcess(processName, testCategoryName, isSubprocess = false) - processManager.withProcessStateStatus(SimpleStateStatus.Canceled) { + deploymentManager.withProcessStateStatus(SimpleStateStatus.Canceled) { cancelProcess(processName.value) ~> check { status shouldBe StatusCodes.Conflict } @@ -83,7 +83,7 @@ class ManagementResourcesSpec extends FunSuite with ScalatestRouteTest with Fail val id = createArchivedProcess(processName) val processIdWithName = ProcessIdWithName(id, processName) - processManager.withProcessStateStatus(SimpleStateStatus.Canceled) { + deploymentManager.withProcessStateStatus(SimpleStateStatus.Canceled) { deployProcess(processName.value) ~> check { status shouldBe StatusCodes.Conflict responseAs[String] shouldBe ProcessIllegalAction.archived(ProcessActionType.Deploy, processIdWithName).message @@ -203,7 +203,7 @@ class ManagementResourcesSpec extends FunSuite with ScalatestRouteTest with Fail test("return error on deployment failure") { saveProcessAndAssertSuccess(SampleProcess.process.id, SampleProcess.process) - processManager.withFailingDeployment { + deploymentManager.withFailingDeployment { deployProcess(SampleProcess.process.id) ~> check { status shouldBe StatusCodes.InternalServerError } @@ -214,7 +214,7 @@ class ManagementResourcesSpec extends FunSuite with ScalatestRouteTest with Fail saveProcessAndAssertSuccess(SampleProcess.process.id, SampleProcess.process) snapshot(SampleProcess.process.id) ~> check { status shouldBe StatusCodes.OK - responseAs[String] shouldBe MockProcessManager.savepointPath + responseAs[String] shouldBe MockDeploymentManager.savepointPath } } @@ -222,7 +222,7 @@ class ManagementResourcesSpec extends FunSuite with ScalatestRouteTest with Fail saveProcessAndAssertSuccess(SampleProcess.process.id, SampleProcess.process) stop(SampleProcess.process.id) ~> check { status shouldBe StatusCodes.OK - responseAs[String] shouldBe MockProcessManager.stopSavepointPath + responseAs[String] shouldBe MockDeploymentManager.stopSavepointPath } } diff --git a/ui/server/src/test/scala/pl/touk/nussknacker/ui/api/ProcessesChangeListenerSpec.scala b/ui/server/src/test/scala/pl/touk/nussknacker/ui/api/ProcessesChangeListenerSpec.scala index 1b29cbda0f8..c88dd6582f3 100644 --- a/ui/server/src/test/scala/pl/touk/nussknacker/ui/api/ProcessesChangeListenerSpec.scala +++ b/ui/server/src/test/scala/pl/touk/nussknacker/ui/api/ProcessesChangeListenerSpec.scala @@ -92,7 +92,7 @@ class ProcessesChangeListenerSpec extends FunSuite with ScalatestRouteTest with test("listen to deployment failure") { val processId = createProcess(processName, testCategoryName, false) - processManager.withFailingDeployment { + deploymentManager.withFailingDeployment { deployProcess(processName.value) ~> checkEventually { TestProcessChangeListener.events.head should matchPattern { case OnDeployActionFailed(`processId`, _) => } } diff --git a/ui/server/src/test/scala/pl/touk/nussknacker/ui/api/ProcessesResourcesSpec.scala b/ui/server/src/test/scala/pl/touk/nussknacker/ui/api/ProcessesResourcesSpec.scala index 4eb681745b8..a9cdcbe4325 100644 --- a/ui/server/src/test/scala/pl/touk/nussknacker/ui/api/ProcessesResourcesSpec.scala +++ b/ui/server/src/test/scala/pl/touk/nussknacker/ui/api/ProcessesResourcesSpec.scala @@ -48,7 +48,7 @@ class ProcessesResourcesSpec extends FunSuite with ScalatestRouteTest with Match private implicit final val string: FromEntityUnmarshaller[String] = Unmarshaller.stringUnmarshaller.forContentTypes(ContentTypeRange.*) - override protected def createProcessManager(): MockProcessManager = new MockProcessManager(SimpleStateStatus.NotDeployed) + override protected def createDeploymentManager(): MockDeploymentManager = new MockDeploymentManager(SimpleStateStatus.NotDeployed) val routeWithRead: Route = withPermissions(processesRoute, testPermissionRead) val routeWithWrite: Route = withPermissions(processesRoute, testPermissionWrite) @@ -94,7 +94,7 @@ class ProcessesResourcesSpec extends FunSuite with ScalatestRouteTest with Match test("not allow to archive still running process") { createDeployedProcess(processName) - processManager.withProcessStateStatus(SimpleStateStatus.Running) { + deploymentManager.withProcessStateStatus(SimpleStateStatus.Running) { archiveProcess(processName) { status => status shouldEqual StatusCodes.Conflict } @@ -176,7 +176,7 @@ class ProcessesResourcesSpec extends FunSuite with ScalatestRouteTest with Match createProcess(processName) val newName = ProcessName("ProcessChangedName") - processManager.withProcessStateStatus(SimpleStateStatus.Running) { + deploymentManager.withProcessStateStatus(SimpleStateStatus.Running) { renameProcess(processName, newName) { status => status shouldEqual StatusCodes.Conflict } @@ -734,7 +734,7 @@ class ProcessesResourcesSpec extends FunSuite with ScalatestRouteTest with Match test("fetching status for deployed process should properly return status") { createDeployedProcess(processName) - processManager.withProcessStateStatus(SimpleStateStatus.Running) { + deploymentManager.withProcessStateStatus(SimpleStateStatus.Running) { Get(s"/processes/${processName.value}/status") ~> routeWithAllPermissions ~> check { status shouldEqual StatusCodes.OK val stateStatusResponse = parseStateResponse(responseAs[Json]) diff --git a/ui/server/src/test/scala/pl/touk/nussknacker/ui/api/helpers/EspItTest.scala b/ui/server/src/test/scala/pl/touk/nussknacker/ui/api/helpers/EspItTest.scala index 07b68623267..405b90b0455 100644 --- a/ui/server/src/test/scala/pl/touk/nussknacker/ui/api/helpers/EspItTest.scala +++ b/ui/server/src/test/scala/pl/touk/nussknacker/ui/api/helpers/EspItTest.scala @@ -16,10 +16,10 @@ import org.scalatest.concurrent.ScalaFutures import pl.touk.nussknacker.engine.{ModelData, ProcessingTypeConfig, ProcessingTypeData} import pl.touk.nussknacker.engine.ProcessingTypeData.ProcessingType import pl.touk.nussknacker.engine.api.StreamMetaData -import pl.touk.nussknacker.engine.api.deployment.{CustomProcess, GraphProcess, ProcessActionType, ProcessManager} +import pl.touk.nussknacker.engine.api.deployment.{CustomProcess, GraphProcess, ProcessActionType, DeploymentManager} import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.graph.EspProcess -import pl.touk.nussknacker.engine.management.FlinkStreamingProcessManagerProvider +import pl.touk.nussknacker.engine.management.FlinkStreamingDeploymentManagerProvider import pl.touk.nussknacker.engine.marshall.ProcessMarshaller import pl.touk.nussknacker.restmodel.displayedgraph.DisplayableProcess import pl.touk.nussknacker.restmodel.{process, processdetails} @@ -64,19 +64,19 @@ trait EspItTest extends LazyLogging with WithHsqlDbTesting with TestPermissions val existingProcessingType = "streaming" - protected def createProcessManager(): MockProcessManager = new MockProcessManager + protected def createDeploymentManager(): MockDeploymentManager = new MockDeploymentManager - lazy val processManager = createProcessManager() + lazy val deploymentManager = createDeploymentManager() - val processManagerProvider = new FlinkStreamingProcessManagerProvider { - override def createProcessManager(modelData: ModelData, config: Config): ProcessManager = processManager + val deploymentManagerProvider = new FlinkStreamingDeploymentManagerProvider { + override def createDeploymentManager(modelData: ModelData, config: Config): DeploymentManager = deploymentManager } val processChangeListener = new TestProcessChangeListener() def createManagementActorRef: ActorRef = { system.actorOf(ManagementActor.props( - mapProcessingTypeDataProvider(TestProcessingTypes.Streaming -> processManager), + mapProcessingTypeDataProvider(TestProcessingTypes.Streaming -> deploymentManager), fetchingProcessRepository, actionRepository, TestFactory.sampleResolver, @@ -121,7 +121,7 @@ trait EspItTest extends LazyLogging with WithHsqlDbTesting with TestPermissions val processingTypeConfig = ProcessingTypeConfig.read(ConfigWithScalaVersion.streamingProcessTypeConfig) val definitionResources = new DefinitionResources( modelDataProvider = mapProcessingTypeDataProvider(existingProcessingType -> processingTypeConfig.toModelData), - processingTypeDataProvider = mapProcessingTypeDataProvider(existingProcessingType -> ProcessingTypeData.createProcessingTypeData(processManagerProvider, processingTypeConfig)), + processingTypeDataProvider = mapProcessingTypeDataProvider(existingProcessingType -> ProcessingTypeData.createProcessingTypeData(deploymentManagerProvider, processingTypeConfig)), subprocessRepository, processCategoryService) diff --git a/ui/server/src/test/scala/pl/touk/nussknacker/ui/api/helpers/TestFactory.scala b/ui/server/src/test/scala/pl/touk/nussknacker/ui/api/helpers/TestFactory.scala index a688952a318..95fc86bbffd 100644 --- a/ui/server/src/test/scala/pl/touk/nussknacker/ui/api/helpers/TestFactory.scala +++ b/ui/server/src/test/scala/pl/touk/nussknacker/ui/api/helpers/TestFactory.scala @@ -13,7 +13,7 @@ import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.api.{ProcessAdditionalFields, ProcessVersion, StreamMetaData} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.graph.exceptionhandler.ExceptionHandlerRef -import pl.touk.nussknacker.engine.management.FlinkProcessManager +import pl.touk.nussknacker.engine.management.FlinkDeploymentManager import pl.touk.nussknacker.restmodel.displayedgraph.{DisplayableProcess, ProcessProperties} import pl.touk.nussknacker.ui.api.helpers.TestPermissions.CategorizedPermission import pl.touk.nussknacker.ui.api.{RouteWithUser, RouteWithoutUser} @@ -115,14 +115,14 @@ object TestFactory extends TestPermissions{ def emptyProcessingTypeDataProvider = new MapBasedProcessingTypeDataProvider[Nothing](Map.empty) - object MockProcessManager { + object MockDeploymentManager { val savepointPath = "savepoints/123-savepoint" val stopSavepointPath = "savepoints/246-stop-savepoint" } - class MockProcessManager(val defaultProcessStateStatus: StateStatus) extends FlinkProcessManager(ProcessingTypeConfig.read(ConfigWithScalaVersion.streamingProcessTypeConfig).toModelData, shouldVerifyBeforeDeploy = false, mainClassName = "UNUSED"){ + class MockDeploymentManager(val defaultProcessStateStatus: StateStatus) extends FlinkDeploymentManager(ProcessingTypeConfig.read(ConfigWithScalaVersion.streamingProcessTypeConfig).toModelData, shouldVerifyBeforeDeploy = false, mainClassName = "UNUSED"){ - import MockProcessManager._ + import MockDeploymentManager._ def this() { this(SimpleStateStatus.Running) diff --git a/ui/server/src/test/scala/pl/touk/nussknacker/ui/definition/UIProcessObjectsFactorySpec.scala b/ui/server/src/test/scala/pl/touk/nussknacker/ui/definition/UIProcessObjectsFactorySpec.scala index 535e364353b..4ee9d9c6f68 100644 --- a/ui/server/src/test/scala/pl/touk/nussknacker/ui/definition/UIProcessObjectsFactorySpec.scala +++ b/ui/server/src/test/scala/pl/touk/nussknacker/ui/definition/UIProcessObjectsFactorySpec.scala @@ -12,7 +12,7 @@ import pl.touk.nussknacker.engine.testing.LocalModelData import pl.touk.nussknacker.engine.util.process.EmptyProcessConfigCreator import pl.touk.nussknacker.engine.{ModelData, ProcessingTypeConfig} import pl.touk.nussknacker.ui.api.helpers.TestFactory -import pl.touk.nussknacker.ui.api.helpers.TestFactory.MockProcessManager +import pl.touk.nussknacker.ui.api.helpers.TestFactory.MockDeploymentManager import pl.touk.nussknacker.ui.process.ConfigProcessCategoryService import pl.touk.nussknacker.ui.util.ConfigWithScalaVersion @@ -62,7 +62,7 @@ class UIProcessObjectsFactorySpec extends FunSuite with Matchers { } - private val mockProcessManager = new MockProcessManager + private val mockDeploymentManager = new MockDeploymentManager test("should read editor from annotations") { val model: ModelData = LocalModelData(ConfigWithScalaVersion.streamingProcessTypeConfig, new EmptyProcessConfigCreator() { @@ -72,7 +72,7 @@ class UIProcessObjectsFactorySpec extends FunSuite with Matchers { val processObjects = UIProcessObjectsFactory.prepareUIProcessObjects( model, - mockProcessManager, + mockDeploymentManager, TestFactory.user("userId"), Set(), false, @@ -101,7 +101,7 @@ class UIProcessObjectsFactorySpec extends FunSuite with Matchers { }) val processObjects = - UIProcessObjectsFactory.prepareUIProcessObjects(model, mockProcessManager, TestFactory.user("userId"), Set(), false, + UIProcessObjectsFactory.prepareUIProcessObjects(model, mockDeploymentManager, TestFactory.user("userId"), Set(), false, new ConfigProcessCategoryService(ConfigWithScalaVersion.config)) processObjects.nodesToAdd.filter(_.name == "hiddenCategory") shouldBe empty @@ -117,7 +117,7 @@ class UIProcessObjectsFactorySpec extends FunSuite with Matchers { }) val processObjects = - UIProcessObjectsFactory.prepareUIProcessObjects(model, mockProcessManager, TestFactory.user("userId"), Set(), false, + UIProcessObjectsFactory.prepareUIProcessObjects(model, mockDeploymentManager, TestFactory.user("userId"), Set(), false, new ConfigProcessCategoryService(ConfigWithScalaVersion.config)) val nodeGroups = processObjects.nodesToAdd.filter(_.name == "someCategory") diff --git a/ui/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/ManagementActorSpec.scala b/ui/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/ManagementActorSpec.scala index b1a08dc511e..2fec469c470 100644 --- a/ui/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/ManagementActorSpec.scala +++ b/ui/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/ManagementActorSpec.scala @@ -10,7 +10,7 @@ import pl.touk.nussknacker.engine.management.{FlinkProcessStateDefinitionManager import pl.touk.nussknacker.restmodel.process import pl.touk.nussknacker.restmodel.process.ProcessIdWithName import pl.touk.nussknacker.test.PatientScalaFutures -import pl.touk.nussknacker.ui.api.helpers.TestFactory.{MockProcessManager, mapProcessingTypeDataProvider, newActionProcessRepository, newDBRepositoryManager, newFetchingProcessRepository, newProcessActivityRepository, newWriteProcessRepository, processResolving, testCategoryName} +import pl.touk.nussknacker.ui.api.helpers.TestFactory.{MockDeploymentManager, mapProcessingTypeDataProvider, newActionProcessRepository, newDBRepositoryManager, newFetchingProcessRepository, newProcessActivityRepository, newWriteProcessRepository, processResolving, testCategoryName} import pl.touk.nussknacker.ui.api.helpers.{ProcessTestData, TestFactory, TestProcessingTypes, WithHsqlDbTesting} import pl.touk.nussknacker.ui.listener.ProcessChangeListener import pl.touk.nussknacker.ui.process.repository.ProcessRepository.CreateProcessAction @@ -28,7 +28,7 @@ class ManagementActorSpec extends FunSuite with Matchers with PatientScalaFuture private implicit val ds: ExecutionContextExecutor = system.dispatcher val processName: ProcessName = ProcessName("proces1") - private val processManager = new MockProcessManager + private val deploymentManager = new MockDeploymentManager private val repositoryManager = newDBRepositoryManager(db) private val fetchingProcessRepository = newFetchingProcessRepository(db) private val writeProcessRepository = newWriteProcessRepository(db) @@ -44,7 +44,7 @@ class ManagementActorSpec extends FunSuite with Matchers with PatientScalaFuture private val managementActor = system.actorOf( ManagementActor.props( - mapProcessingTypeDataProvider(TestProcessingTypes.Streaming -> processManager), + mapProcessingTypeDataProvider(TestProcessingTypes.Streaming -> deploymentManager), fetchingProcessRepository, actionRepository, TestFactory.sampleResolver, @@ -61,7 +61,7 @@ class ManagementActorSpec extends FunSuite with Matchers with PatientScalaFuture test("should return state correctly when state is deployed") { val id: process.ProcessId = prepareProcess(processName).futureValue - processManager.withWaitForDeployFinish { + deploymentManager.withWaitForDeployFinish { managementActor ! Deploy(ProcessIdWithName(id, processName), user, None, None) processService.getProcessState(ProcessIdWithName(id, processName)).futureValue.status shouldBe SimpleStateStatus.DuringDeploy } @@ -76,7 +76,7 @@ class ManagementActorSpec extends FunSuite with Matchers with PatientScalaFuture isFollowingDeploy(processService.getProcessState(ProcessIdWithName(id, processName)).futureValue) shouldBe true fetchingProcessRepository.fetchLatestProcessDetailsForProcessId[Unit](id).futureValue.get.lastAction should not be None - processManager.withProcessFinished { + deploymentManager.withProcessFinished { //we simulate what happens when retrieveStatus is called mulitple times to check only one comment is added (1 to 5).foreach { _ => isFollowingDeploy(processService.getProcessState(ProcessIdWithName(id, processName)).futureValue) shouldBe false @@ -98,7 +98,7 @@ class ManagementActorSpec extends FunSuite with Matchers with PatientScalaFuture test("Should return properly state when state is canceled and process is canceled") { val id = prepareCanceledProcess(processName).futureValue - processManager.withProcessStateStatus(SimpleStateStatus.Canceled) { + deploymentManager.withProcessStateStatus(SimpleStateStatus.Canceled) { processService.getProcessState(ProcessIdWithName(id, processName)).futureValue.status shouldBe SimpleStateStatus.Canceled } } @@ -108,7 +108,7 @@ class ManagementActorSpec extends FunSuite with Matchers with PatientScalaFuture fetchingProcessRepository.fetchLatestProcessDetailsForProcessId[Unit](id).futureValue.get.lastAction should not be None - processManager.withEmptyProcessState { + deploymentManager.withEmptyProcessState { processService.getProcessState(ProcessIdWithName(id, processName)).futureValue.status shouldBe SimpleStateStatus.Canceled } @@ -123,7 +123,7 @@ class ManagementActorSpec extends FunSuite with Matchers with PatientScalaFuture fetchingProcessRepository.fetchLatestProcessDetailsForProcessId[Unit](id).futureValue.get.lastAction should not be None - processManager.withEmptyProcessState { + deploymentManager.withEmptyProcessState { processService.getProcessState(ProcessIdWithName(id, processName)).futureValue.status shouldBe SimpleStateStatus.Canceled } @@ -136,7 +136,7 @@ class ManagementActorSpec extends FunSuite with Matchers with PatientScalaFuture test("Should return state with warning when state is running and process is canceled") { val id = prepareCanceledProcess(processName).futureValue - processManager.withProcessStateStatus(SimpleStateStatus.Running) { + deploymentManager.withProcessStateStatus(SimpleStateStatus.Running) { val state = processService.getProcessState(ProcessIdWithName(id, processName)).futureValue state.status shouldBe SimpleStateStatus.Warning @@ -149,7 +149,7 @@ class ManagementActorSpec extends FunSuite with Matchers with PatientScalaFuture test("Should return state with warning when state is running and process is not deployed") { val id = prepareProcess(processName).futureValue - processManager.withProcessStateStatus(SimpleStateStatus.Running) { + deploymentManager.withProcessStateStatus(SimpleStateStatus.Running) { val state = processService.getProcessState(ProcessIdWithName(id, processName)).futureValue state.status shouldBe SimpleStateStatus.Warning @@ -162,7 +162,7 @@ class ManagementActorSpec extends FunSuite with Matchers with PatientScalaFuture test("Should return state with warning when state is during canceled and process hasn't action") { val id = prepareProcess(processName).futureValue - processManager.withProcessStateStatus(SimpleStateStatus.DuringCancel) { + deploymentManager.withProcessStateStatus(SimpleStateStatus.DuringCancel) { val state = processService.getProcessState(ProcessIdWithName(id, processName)).futureValue state.status shouldBe SimpleStateStatus.Warning @@ -175,7 +175,7 @@ class ManagementActorSpec extends FunSuite with Matchers with PatientScalaFuture test("Should return state with error when state is finished and process hasn't action") { val id = prepareProcess(processName).futureValue - processManager.withProcessStateStatus(SimpleStateStatus.Finished) { + deploymentManager.withProcessStateStatus(SimpleStateStatus.Finished) { val state = processService.getProcessState(ProcessIdWithName(id, processName)).futureValue state.status shouldBe SimpleStateStatus.Warning @@ -189,7 +189,7 @@ class ManagementActorSpec extends FunSuite with Matchers with PatientScalaFuture val state = ProcessState("12", FlinkStateStatus.Restarting, Some(ProcessVersion.empty), FlinkProcessStateDefinitionManager) - processManager.withProcessState(Some(state)) { + deploymentManager.withProcessState(Some(state)) { val state = processService.getProcessState(ProcessIdWithName(id, processName)).futureValue //See comment in ManagementActor.handleState... @@ -203,7 +203,7 @@ class ManagementActorSpec extends FunSuite with Matchers with PatientScalaFuture test("Should return state with error when state is not running and process is deployed") { val id = prepareDeployedProcess(processName).futureValue - processManager.withProcessStateStatus(SimpleStateStatus.Canceled) { + deploymentManager.withProcessStateStatus(SimpleStateStatus.Canceled) { val state = processService.getProcessState(ProcessIdWithName(id, processName)).futureValue state.status shouldBe SimpleStateStatus.Error @@ -216,7 +216,7 @@ class ManagementActorSpec extends FunSuite with Matchers with PatientScalaFuture test("Should return state with error when state is null and process is deployed") { val id = prepareDeployedProcess(processName).futureValue - processManager.withEmptyProcessState { + deploymentManager.withEmptyProcessState { val state = processService.getProcessState(ProcessIdWithName(id, processName)).futureValue state.status shouldBe SimpleStateStatus.Error @@ -230,7 +230,7 @@ class ManagementActorSpec extends FunSuite with Matchers with PatientScalaFuture val id = prepareDeployedProcess(processName).futureValue val version = Some(ProcessVersion(versionId = 2, processName = ProcessName(""), user = "", modelVersion = None)) - processManager.withProcessStateVersion(SimpleStateStatus.Running, version) { + deploymentManager.withProcessStateVersion(SimpleStateStatus.Running, version) { val state = processService.getProcessState(ProcessIdWithName(id, processName)).futureValue state.status shouldBe SimpleStateStatus.Error @@ -244,7 +244,7 @@ class ManagementActorSpec extends FunSuite with Matchers with PatientScalaFuture val id = prepareDeployedProcess(processName).futureValue val version = Some(ProcessVersion(versionId = 2, processName = ProcessName(""), user = "", modelVersion = None)) - processManager.withProcessStateVersion(SimpleStateStatus.Failed, version) { + deploymentManager.withProcessStateVersion(SimpleStateStatus.Failed, version) { val state = processService.getProcessState(ProcessIdWithName(id, processName)).futureValue state.status shouldBe SimpleStateStatus.Failed @@ -255,7 +255,7 @@ class ManagementActorSpec extends FunSuite with Matchers with PatientScalaFuture test("Should return warning state when state is running with empty version and process is deployed") { val id = prepareDeployedProcess(processName).futureValue - processManager.withProcessStateVersion(SimpleStateStatus.Running, Option.empty) { + deploymentManager.withProcessStateVersion(SimpleStateStatus.Running, Option.empty) { val state = processService.getProcessState(ProcessIdWithName(id, processName)).futureValue state.status shouldBe SimpleStateStatus.Warning @@ -269,7 +269,7 @@ class ManagementActorSpec extends FunSuite with Matchers with PatientScalaFuture val id = prepareProcess(processName).futureValue fetchingProcessRepository.fetchLatestProcessDetailsForProcessId[Unit](id).futureValue.get.lastAction shouldBe None - processManager.withEmptyProcessState { + deploymentManager.withEmptyProcessState { processService.getProcessState(ProcessIdWithName(id, processName)).futureValue.status shouldBe SimpleStateStatus.NotDeployed } @@ -282,7 +282,7 @@ class ManagementActorSpec extends FunSuite with Matchers with PatientScalaFuture val id = prepareProcess(processName).futureValue fetchingProcessRepository.fetchLatestProcessDetailsForProcessId[Unit](id).futureValue.get.lastAction shouldBe None - processManager.withEmptyProcessState { + deploymentManager.withEmptyProcessState { processService.getProcessState(ProcessIdWithName(id, processName)).futureValue.status shouldBe SimpleStateStatus.NotDeployed } @@ -293,7 +293,7 @@ class ManagementActorSpec extends FunSuite with Matchers with PatientScalaFuture test("Should return NotDeployed state for archived process with missing state") { val id = prepareArchivedProcess(processName).futureValue - processManager.withEmptyProcessState { + deploymentManager.withEmptyProcessState { val state = processService.getProcessState(ProcessIdWithName(id, processName)).futureValue state.status shouldBe SimpleStateStatus.NotDeployed @@ -302,7 +302,7 @@ class ManagementActorSpec extends FunSuite with Matchers with PatientScalaFuture test("Should return NotDeployed state for unarchived process with missing state") { val id = prepareUnArchivedProcess(processName).futureValue - processManager.withEmptyProcessState { + deploymentManager.withEmptyProcessState { val state = processService.getProcessState(ProcessIdWithName(id, processName)).futureValue state.status shouldBe SimpleStateStatus.NotDeployed @@ -312,7 +312,7 @@ class ManagementActorSpec extends FunSuite with Matchers with PatientScalaFuture test("Should return any status for archived process with any available state") { val id = prepareArchivedProcess(processName).futureValue - processManager.withProcessStateStatus(SimpleStateStatus.Canceled) { + deploymentManager.withProcessStateStatus(SimpleStateStatus.Canceled) { val state = processService.getProcessState(ProcessIdWithName(id, processName)).futureValue state.status shouldBe SimpleStateStatus.Canceled @@ -322,7 +322,7 @@ class ManagementActorSpec extends FunSuite with Matchers with PatientScalaFuture test("Should return warning status for archived process with running state") { val id = prepareArchivedProcess(processName).futureValue - processManager.withProcessStateStatus(SimpleStateStatus.Running) { + deploymentManager.withProcessStateStatus(SimpleStateStatus.Running) { val state = processService.getProcessState(ProcessIdWithName(id, processName)).futureValue state.status shouldBe SimpleStateStatus.Warning