Skip to content

Commit

Permalink
Rename ProcessManager to DeploymentManager (#1921)
Browse files Browse the repository at this point in the history
* ProcessManager -> DeploymentManager

* review

* review
  • Loading branch information
mproch authored Jul 21, 2021
1 parent 984d30b commit 9b8479b
Show file tree
Hide file tree
Showing 54 changed files with 275 additions and 273 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ jobs:
run: tar xfz target.tgz
- name: Integration tests
shell: bash
run: ./ciRunSbt.sh flinkProcessManager/it:test engineStandalone/it:test processReports/it:test security/it:test
run: ./ciRunSbt.sh flinkDeploymentManager/it:test engineStandalone/it:test processReports/it:test security/it:test
slowTests:
name: Slow tests
runs-on: ubuntu-latest
Expand Down
18 changes: 9 additions & 9 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -339,14 +339,14 @@ lazy val dist = {
packageName in Universal := ("nussknacker" + "-" + version.value),
Keys.compile in Compile := (Keys.compile in Compile).dependsOn(
(assembly in Compile) in generic,
(assembly in Compile) in flinkProcessManager,
(assembly in Compile) in flinkDeploymentManager,
(assembly in Compile) in engineStandalone,
(assembly in Compile) in openapi,
(assembly in Compile) in sql,
).value,
mappings in Universal ++= Seq(
(crossTarget in generic).value / "genericModel.jar" -> "model/genericModel.jar",
(crossTarget in flinkProcessManager).value / "nussknacker-flink-manager.jar" -> "managers/nussknacker-flink-manager.jar",
(crossTarget in flinkDeploymentManager).value / "nussknacker-flink-manager.jar" -> "managers/nussknacker-flink-manager.jar",
(crossTarget in engineStandalone).value / "nussknacker-standalone-manager.jar" -> "managers/nussknacker-standalone-manager.jar",
(crossTarget in openapi).value / "openapi.jar" -> "components/openapi.jar",
(crossTarget in sql).value / "sql.jar" -> "components/sql.jar"
Expand Down Expand Up @@ -450,7 +450,7 @@ lazy val standaloneApp = (project in engine("standalone/app")).
dependsOn(engineStandalone, interpreter, httpUtils, testUtil % "test", standaloneUtil % "test")


lazy val flinkProcessManager = (project in engine("flink/management")).
lazy val flinkDeploymentManager = (project in engine("flink/management")).
configs(IntegrationTest).
settings(commonSettings).
settings(Defaults.itSettings).
Expand Down Expand Up @@ -483,7 +483,7 @@ lazy val flinkProcessManager = (project in engine("flink/management")).
httpUtils % "provided",
kafkaTestUtil % "it,test")

lazy val flinkPeriodicProcessManager = (project in engine("flink/management/periodic")).
lazy val flinkPeriodicDeploymentManager = (project in engine("flink/management/periodic")).
settings(commonSettings).
settings(assemblySettings("nussknacker-flink-periodic-manager.jar", includeScala = false): _*).
settings(
Expand All @@ -497,7 +497,7 @@ lazy val flinkPeriodicProcessManager = (project in engine("flink/management/peri
"com.cronutils" % "cron-utils" % cronParserV
)
}
).dependsOn(flinkProcessManager,
).dependsOn(flinkDeploymentManager,
interpreter % "provided",
api % "provided",
httpUtils % "provided",
Expand Down Expand Up @@ -1034,7 +1034,7 @@ lazy val ui = (project in file("ui/server"))
"org.slf4j" % "log4j-over-slf4j" % slf4jV,
"com.carrotsearch" % "java-sizeof" % "0.0.5",

//It's needed by flinkProcessManager which has disabled includingScala
//It's needed by flinkDeploymentManager which has disabled includingScala
"org.scala-lang" % "scala-compiler" % scalaVersion.value,
"org.scala-lang" % "scala-reflect" % scalaVersion.value,

Expand All @@ -1057,7 +1057,7 @@ lazy val ui = (project in file("ui/server"))
//TODO: this is unfortunatelly needed to run without too much hassle in Intellij...
//provided dependency of kafka is workaround for Idea, which is not able to handle test scope on module dependency
//otherwise it is (wrongly) added to classpath when running UI from Idea
flinkProcessManager % "provided" ,
flinkDeploymentManager % "provided" ,
kafka % "provided",
engineStandalone % "provided"
)
Expand Down Expand Up @@ -1100,7 +1100,7 @@ lazy val bom = (project in file("bom"))
).dependsOn(modules.map(k => k:ClasspathDep[ProjectReference]):_*)

lazy val modules = List[ProjectReference](
engineStandalone, standaloneApp, flinkProcessManager, flinkPeriodicProcessManager, standaloneSample, flinkManagementSample, managementJavaSample, generic,
engineStandalone, standaloneApp, flinkDeploymentManager, flinkPeriodicDeploymentManager, standaloneSample, flinkManagementSample, managementJavaSample, generic,
openapi, process, interpreter, benchmarks, kafka, avroFlinkUtil, kafkaFlinkUtil, kafkaTestUtil, util, testUtil, flinkUtil, flinkModelUtil,
flinkTestUtil, standaloneUtil, standaloneApi, api, security, flinkApi, processReports, httpUtils, queryableState,
restmodel, listenerApi, ui, sql
Expand Down Expand Up @@ -1139,4 +1139,4 @@ lazy val root = (project in file("."))
)

addCommandAlias("assemblySamples", ";flinkManagementSample/assembly;standaloneSample/assembly;generic/assembly")
addCommandAlias("assemblyEngines", ";flinkProcessManager/assembly;engineStandalone/assembly")
addCommandAlias("assemblyDeploymentManagers", ";flinkDeploymentManager/assembly;engineStandalone/assembly")
15 changes: 8 additions & 7 deletions docs/Architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@

Nussknacker consists of three parts:

**engines** - are **libraries** which transform internal json process representation (process graph) into jobs. For example Flink engine generates Flink specific code, compiles and packages all needed components into JAR file for execution, and then runs the job via Flink REST API.
**engines** - are **libraries** which transform internal json scenario representation (scenario graph) into jobs. For example Flink engine generates Flink specific code, compiles and packages all needed components into JAR file for execution, and then runs the job via Flink REST API.

**ui** - is a standalone **application** which allows users to design process diagrams and to deploy them into runtime environments.
**ui** - is a standalone **application** which allows users to design scenario diagrams and to deploy them into runtime environments.

**integrations** - are your application specific classes like your model, http services, or custom stateful components.


##Engine
Engine consists of various modules that enable creation of processes building blocks in UI and interpretation of process diagrams. Engines implement `ProcessManagerProvider`.
## Engines
Engine consists of various modules that enable creation of scenarios building blocks in UI and interpretation of scenario diagrams.
Scenarios can be deployed on engine (like Flink) using `DeploymentManager` - e.g., for Flink it controls job submission to particular Flink cluster.

##UI
The **ui** application is a simple application written using Scala, Akka Http and Slick on the backend side and ReactJS on the front. Processes, their history, comments and other metadata are persisted in relational database (by default it's simple embedded H2). UI communicates with Apache Flink cluster using embedded Flink client.
## UI
The **ui** application is a simple application written using Scala, Akka Http and Slick on the backend side and ReactJS on the front. Scenarios, their history, comments and other metadata are persisted in relational database (by default it's simple embedded H2). UI communicates with Apache Flink cluster using embedded Flink client.

##Integrations
## Integrations
Integrations module implements `ProcessConfigCreator` interface which is an entry point of Nussknacker. See [API](API.md) for more datails.
2 changes: 1 addition & 1 deletion docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ Nussknacker versions
* Various naming changes:
* [#1917](https://github.com/TouK/nussknacker/pull/1917) configuration of `engineConfig` to `deploymentConfig`
* [#1911](https://github.com/TouK/nussknacker/pull/1911) Rename `process` to `scenario`, `subprocess` to `fragment` in messages at backend and some test cases names
* [#1921](https://github.com/TouK/nussknacker/pull/1921) `ProcessManager` to `DeploymentManager`
* [#1927](https://github.com/TouK/nussknacker/pull/1927) Rename `outer-join` to `single-side-join`


0.3.1 (not released yet)
------------------------
Expand Down
4 changes: 2 additions & 2 deletions docs/Engines.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
Engines
=======
Nussknacker was created as GUI for Flink. However, it's also possible to use it to connect to other runtimes.
```ProcessManager``` and ```ProcessManagerProvider``` interfaces were created to facilitate this.
```DeploymentManager``` and ```DeploymentManagerProvider``` interfaces were created to facilitate this.
In particular, we provide experimental standalone engine, which allows using Nussknacker as method of creating REST APIs

To create/customize Nussknacker engine you have to:
- Implement ```ProcessManagerProvider``` interface and register it with ServiceLoader mechanism.
- Implement ```DeploymentManagerProvider``` interface and register it with ServiceLoader mechanism.
Each provider should have unique ```name```.
- Put implementation with all needed libraries on Nussknacker classpath.
- Configure appropriate model to use your process manager.
Expand Down
7 changes: 4 additions & 3 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ To see biggest differences please consult the [changelog](Changelog.md).
doesn't emit full context of variables that were before node (because of performance reasons and because that wasn't obvious which one context is emitted).
If you want to emit some information other than aggregated value and key (availabled via new `#key` variable), you should use `#AGG.map` expression in `aggregateBy`.
* [#1910](https://github.com/TouK/nussknacker/pull/1910) `processTypes` renamed to `scenarioTypes`. You can still use old `processTypes` configuration. Old configuration will be removed in version `0.5.0`.
* Various naming changes:
* [#1917](https://github.com/TouK/nussknacker/pull/1917) configuration of `engineConfig` to `deploymentConfig`
* [#1921](https://github.com/TouK/nussknacker/pull/1921) `ProcessManager` to `DeploymentManager`
* [#1927](https://github.com/TouK/nussknacker/pull/1927) Rename `outer-join` to `single-side-join`

## In version 0.3.0

Expand Down Expand Up @@ -182,9 +186,6 @@ that will be hidden before parameter's evaluation
- Removed: `getClusterClient` from `FlinkMiniClusterHolder` interface, because of flink compatibility at Flink 1.9
- Renamed: `FlinkStreamingProcessRegistrar` to `FlinkProcessManager`
* [#1303](https://github.com/TouK/nussknacker/pull/1303) TypedObjectTypingResult has a new field: additionalInfo
* Various naming changes:
* [#1917](https://github.com/TouK/nussknacker/pull/1917) configuration of `engineConfig` to `deploymentConfig`
* [#1927](https://github.com/TouK/nussknacker/pull/1927) Rename `outer-join` to `single-side-join`

## In version 0.2.0

Expand Down
2 changes: 1 addition & 1 deletion docs/operations_guide/Operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ It’s possible to configure Nussknacker installation to use other metrics setup

### Common Flink configuration issues

Nussknacker assumes that the Flink Session Cluster is used (it should be possible to write own, custom `ProcessManager` to deploy with Job/Application mode,
Nussknacker assumes that the Flink Session Cluster is used (it should be possible to write own, custom `DeploymentManager` to deploy with Job/Application mode,
but this is out of scope of this guide).

It usually happens (especially for large deployments) that the Flink cluster used with Nusssknacker has quite a lot of jobs (each representing one scenario), many of them are quite small in terms of needed resources - this is different to usual Flink setup, where a cluster has one or few jobs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import pl.touk.nussknacker.engine.api.process.ProcessName

import scala.concurrent.Future

trait ProcessManager extends AutoCloseable {
trait DeploymentManager extends AutoCloseable {

//TODO: savepointPath is very flink specific, how can we handle that differently?
def deploy(processVersion: ProcessVersion, deploymentData: DeploymentData, processDeploymentData: ProcessDeploymentData, savepointPath: Option[String]) : Future[Option[ExternalDeploymentId]]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package pl.touk.nussknacker.genericmodel

import com.typesafe.config.ConfigFactory
import pl.touk.nussknacker.engine.ProcessManagerProvider
import pl.touk.nussknacker.engine.testing.{LocalModelData, ProcessManagerProviderStub}
import pl.touk.nussknacker.engine.DeploymentManagerProvider
import pl.touk.nussknacker.engine.testing.{LocalModelData, DeploymentManagerProviderStub}
import pl.touk.nussknacker.ui.util.LocalNussknackerWithSingleModel

//Sample app to simplify local development.
Expand All @@ -13,7 +13,7 @@ object RunGenericModelLocally extends App {

val managerConfig = ConfigFactory.empty()
//For simplicity we use stub here, one can add real Flink implementation after add appropriate dependencies
val provider: ProcessManagerProvider = new ProcessManagerProviderStub
val provider: DeploymentManagerProvider = new DeploymentManagerProviderStub
LocalNussknackerWithSingleModel.run(modelData, provider, managerConfig, Set("Default"))

}
4 changes: 2 additions & 2 deletions engine/flink/management/periodic/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ process is scheduled to be run again according to the schedule.

## Usage

- Implement `ProcessManagerProvider` using `PeriodicProcessManagerProvider`. Following components need to provided:
- Implement `DeploymentManagerProvider` using `PeriodicDeploymentManagerProvider`. Following components need to provided:
- Underlying engine, currently only Flink is supported.
- Optional `SchedulePropertyExtractor` to determine how to construct an instance of a periodic property. By default
a cron expression set in process properties is used to describe when a process should be run.
- Optional `EnrichDeploymentWithJarDataFactory` if you would like to, for example, extend process configuration,
by default nothing is done.
- Add service provider with your `ProcessManagerProvider` implementation.
- Add service provider with your `DeploymentManagerProvider` implementation.
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ import sttp.client.{NothingT, SttpBackend}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}

object PeriodicProcessManager {
def apply(delegate: ProcessManager,
object PeriodicDeploymentManager {
def apply(delegate: DeploymentManager,
schedulePropertyExtractor: SchedulePropertyExtractor,
enrichDeploymentWithJarDataFactory: EnrichDeploymentWithJarDataFactory,
periodicBatchConfig: PeriodicBatchConfig,
flinkConfig: FlinkConfig,
originalConfig: Config,
modelData: ModelData,
listenerFactory: PeriodicProcessListenerFactory,
additionalDeploymentDataProvider: AdditionalDeploymentDataProvider): PeriodicProcessManager = {
additionalDeploymentDataProvider: AdditionalDeploymentDataProvider): PeriodicDeploymentManager = {
implicit val system: ActorSystem = ActorSystem("periodic-process-manager-provider")
implicit val ec: ExecutionContext = ExecutionContext.global
implicit val backend: SttpBackend[Future, Nothing, NothingT] = AsyncHttpClientFutureBackend.usingConfigBuilder { builder =>
Expand All @@ -56,15 +56,15 @@ object PeriodicProcessManager {
Await.ready(backend.close(), 10 seconds)
()
}
new PeriodicProcessManager(delegate, service, schedulePropertyExtractor, toClose)
new PeriodicDeploymentManager(delegate, service, schedulePropertyExtractor, toClose)
}
}

class PeriodicProcessManager(val delegate: ProcessManager,
class PeriodicDeploymentManager(val delegate: DeploymentManager,
service: PeriodicProcessService,
schedulePropertyExtractor: SchedulePropertyExtractor,
toClose: () => Unit)
(implicit val ec: ExecutionContext) extends ProcessManager with LazyLogging {
(implicit val ec: ExecutionContext) extends DeploymentManager with LazyLogging {

override def deploy(processVersion: ProcessVersion,
deploymentData: DeploymentData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,32 @@ package pl.touk.nussknacker.engine.management.periodic
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import pl.touk.nussknacker.engine.api.TypeSpecificData
import pl.touk.nussknacker.engine.api.deployment.ProcessManager
import pl.touk.nussknacker.engine.api.deployment.DeploymentManager
import pl.touk.nussknacker.engine.api.queryablestate.QueryableClient
import pl.touk.nussknacker.engine.management.FlinkConfig
import pl.touk.nussknacker.engine.management.periodic.service.{AdditionalDeploymentDataProvider, DefaultAdditionalDeploymentDataProvider, EmptyListener, EmptyPeriodicProcessListenerFactory, PeriodicProcessListener, PeriodicProcessListenerFactory}
import pl.touk.nussknacker.engine.util.config.ConfigEnrichments.RichConfig
import pl.touk.nussknacker.engine.{ModelData, ProcessManagerProvider}
import pl.touk.nussknacker.engine.{ModelData, DeploymentManagerProvider}

class PeriodicProcessManagerProvider(delegate: ProcessManagerProvider,
class PeriodicDeploymentManagerProvider(delegate: DeploymentManagerProvider,
schedulePropertyExtractor: SchedulePropertyExtractor = CronSchedulePropertyExtractor(),
enrichDeploymentWithJarDataFactory: EnrichDeploymentWithJarDataFactory = EnrichDeploymentWithJarDataFactory.noOp,
listenerFactory: PeriodicProcessListenerFactory = EmptyPeriodicProcessListenerFactory,
additionalDeploymentDataProvider: AdditionalDeploymentDataProvider = DefaultAdditionalDeploymentDataProvider
) extends ProcessManagerProvider with LazyLogging {
) extends DeploymentManagerProvider with LazyLogging {

override def name: String = s"${delegate.name}Periodic"

override def createProcessManager(modelData: ModelData, config: Config): ProcessManager = {
override def createDeploymentManager(modelData: ModelData, config: Config): DeploymentManager = {
logger.info("Creating periodic scenario manager")
val delegateProcessManager = delegate.createProcessManager(modelData, config)
val delegateDeploymentManager = delegate.createDeploymentManager(modelData, config)

import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ArbitraryTypeReader._
val periodicBatchConfig = config.as[PeriodicBatchConfig]("processManager")
val periodicBatchConfig = config.as[PeriodicBatchConfig]("deploymentManager")
val flinkConfig = config.rootAs[FlinkConfig]
PeriodicProcessManager(
delegate = delegateProcessManager,
PeriodicDeploymentManager(
delegate = delegateDeploymentManager,
schedulePropertyExtractor = schedulePropertyExtractor,
enrichDeploymentWithJarDataFactory = enrichDeploymentWithJarDataFactory,
periodicBatchConfig = periodicBatchConfig,
Expand Down
Loading

0 comments on commit 9b8479b

Please sign in to comment.