Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename ProcessManager to DeploymentManager #1921

Merged
merged 4 commits into from
Jul 21, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ jobs:
run: tar xfz target.tgz
- name: Integration tests
shell: bash
run: ./ciRunSbt.sh flinkProcessManager/it:test engineStandalone/it:test processReports/it:test security/it:test
run: ./ciRunSbt.sh flinkDeploymentManager/it:test engineStandalone/it:test processReports/it:test security/it:test
slowTests:
name: Slow tests
runs-on: ubuntu-latest
Expand Down
18 changes: 9 additions & 9 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -339,14 +339,14 @@ lazy val dist = {
packageName in Universal := ("nussknacker" + "-" + version.value),
Keys.compile in Compile := (Keys.compile in Compile).dependsOn(
(assembly in Compile) in generic,
(assembly in Compile) in flinkProcessManager,
(assembly in Compile) in flinkDeploymentManager,
(assembly in Compile) in engineStandalone,
(assembly in Compile) in openapi,
(assembly in Compile) in sql,
).value,
mappings in Universal ++= Seq(
(crossTarget in generic).value / "genericModel.jar" -> "model/genericModel.jar",
(crossTarget in flinkProcessManager).value / "nussknacker-flink-manager.jar" -> "managers/nussknacker-flink-manager.jar",
(crossTarget in flinkDeploymentManager).value / "nussknacker-flink-manager.jar" -> "managers/nussknacker-flink-manager.jar",
(crossTarget in engineStandalone).value / "nussknacker-standalone-manager.jar" -> "managers/nussknacker-standalone-manager.jar",
(crossTarget in openapi).value / "openapi.jar" -> "components/openapi.jar",
(crossTarget in sql).value / "sql.jar" -> "components/sql.jar"
Expand Down Expand Up @@ -450,7 +450,7 @@ lazy val standaloneApp = (project in engine("standalone/app")).
dependsOn(engineStandalone, interpreter, httpUtils, testUtil % "test", standaloneUtil % "test")


lazy val flinkProcessManager = (project in engine("flink/management")).
lazy val flinkDeploymentManager = (project in engine("flink/management")).
configs(IntegrationTest).
settings(commonSettings).
settings(Defaults.itSettings).
Expand Down Expand Up @@ -483,7 +483,7 @@ lazy val flinkProcessManager = (project in engine("flink/management")).
httpUtils % "provided",
kafkaTestUtil % "it,test")

lazy val flinkPeriodicProcessManager = (project in engine("flink/management/periodic")).
lazy val flinkPeriodicDeploymentManager = (project in engine("flink/management/periodic")).
settings(commonSettings).
settings(assemblySettings("nussknacker-flink-periodic-manager.jar", includeScala = false): _*).
settings(
Expand All @@ -497,7 +497,7 @@ lazy val flinkPeriodicProcessManager = (project in engine("flink/management/peri
"com.cronutils" % "cron-utils" % cronParserV
)
}
).dependsOn(flinkProcessManager,
).dependsOn(flinkDeploymentManager,
interpreter % "provided",
api % "provided",
httpUtils % "provided",
Expand Down Expand Up @@ -1034,7 +1034,7 @@ lazy val ui = (project in file("ui/server"))
"org.slf4j" % "log4j-over-slf4j" % slf4jV,
"com.carrotsearch" % "java-sizeof" % "0.0.5",

//It's needed by flinkProcessManager which has disabled includingScala
//It's needed by flinkDeploymentManager which has disabled includingScala
"org.scala-lang" % "scala-compiler" % scalaVersion.value,
"org.scala-lang" % "scala-reflect" % scalaVersion.value,

Expand All @@ -1057,7 +1057,7 @@ lazy val ui = (project in file("ui/server"))
//TODO: this is unfortunatelly needed to run without too much hassle in Intellij...
//provided dependency of kafka is workaround for Idea, which is not able to handle test scope on module dependency
//otherwise it is (wrongly) added to classpath when running UI from Idea
flinkProcessManager % "provided" ,
flinkDeploymentManager % "provided" ,
kafka % "provided",
engineStandalone % "provided"
)
Expand Down Expand Up @@ -1100,7 +1100,7 @@ lazy val bom = (project in file("bom"))
).dependsOn(modules.map(k => k:ClasspathDep[ProjectReference]):_*)

lazy val modules = List[ProjectReference](
engineStandalone, standaloneApp, flinkProcessManager, flinkPeriodicProcessManager, standaloneSample, flinkManagementSample, managementJavaSample, generic,
engineStandalone, standaloneApp, flinkDeploymentManager, flinkPeriodicDeploymentManager, standaloneSample, flinkManagementSample, managementJavaSample, generic,
openapi, process, interpreter, benchmarks, kafka, avroFlinkUtil, kafkaFlinkUtil, kafkaTestUtil, util, testUtil, flinkUtil, flinkModelUtil,
flinkTestUtil, standaloneUtil, standaloneApi, api, security, flinkApi, processReports, httpUtils, queryableState,
restmodel, listenerApi, ui, sql
Expand Down Expand Up @@ -1139,4 +1139,4 @@ lazy val root = (project in file("."))
)

addCommandAlias("assemblySamples", ";flinkManagementSample/assembly;standaloneSample/assembly;generic/assembly")
addCommandAlias("assemblyEngines", ";flinkProcessManager/assembly;engineStandalone/assembly")
addCommandAlias("assemblyDeploymentManagers", ";flinkDeploymentManager/assembly;engineStandalone/assembly")
2 changes: 1 addition & 1 deletion docs/Architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Nussknacker consists of three parts:


##Engine
Engine consists of various modules that enable creation of processes building blocks in UI and interpretation of process diagrams. Engines implement `ProcessManagerProvider`.
Engine consists of various modules that enable creation of processes building blocks in UI and interpretation of process diagrams. Engines implement `DeploymentManagerProvider`.

##UI
The **ui** application is a simple application written using Scala, Akka Http and Slick on the backend side and ReactJS on the front. Processes, their history, comments and other metadata are persisted in relational database (by default it's simple embedded H2). UI communicates with Apache Flink cluster using embedded Flink client.
Expand Down
1 change: 1 addition & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Nussknacker versions
* Various naming changes:
* [#1917](https://github.com/TouK/nussknacker/pull/1917) configuration of `engineConfig` to `deploymentConfig`
* [#1911](https://github.com/TouK/nussknacker/pull/1911) Rename `process` to `scenario`, `subprocess` to `fragment` in messages at backend and some test cases names
* [#1921](https://github.com/TouK/nussknacker/pull/1921) `ProcessManager` to `DeploymentManager`


0.3.1 (not released yet)
Expand Down
4 changes: 2 additions & 2 deletions docs/Engines.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
Engines
=======
Nussknacker was created as GUI for Flink. However, it's also possible to use it to connect to other runtimes.
```ProcessManager``` and ```ProcessManagerProvider``` interfaces were created to facilitate this.
```DeploymentManager``` and ```DeploymentManagerProvider``` interfaces were created to facilitate this.
In particular, we provide experimental standalone engine, which allows using Nussknacker as method of creating REST APIs

To create/customize Nussknacker engine you have to:
- Implement ```ProcessManagerProvider``` interface and register it with ServiceLoader mechanism.
- Implement ```DeploymentManagerProvider``` interface and register it with ServiceLoader mechanism.
Each provider should have unique ```name```.
- Put implementation with all needed libraries on Nussknacker classpath.
- Configure appropriate model to use your process manager.
Expand Down
1 change: 1 addition & 0 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ that will be hidden before parameter's evaluation
* [#1303](https://github.com/TouK/nussknacker/pull/1303) TypedObjectTypingResult has a new field: additionalInfo
* Various naming changes:
* [#1917](https://github.com/TouK/nussknacker/pull/1917) configuration of `engineConfig` to `deploymentConfig`
* [#1921](https://github.com/TouK/nussknacker/pull/1921) `ProcessManager` to `DeploymentManager`

## In version 0.2.0

Expand Down
2 changes: 1 addition & 1 deletion docs/operations_guide/Operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ It’s possible to configure Nussknacker installation to use other metrics setup

### Common Flink configuration issues

Nussknacker assumes that the Flink Session Cluster is used (it should be possible to write own, custom `ProcessManager` to deploy with Job/Application mode,
Nussknacker assumes that the Flink Session Cluster is used (it should be possible to write own, custom `DeploymentManager` to deploy with Job/Application mode,
but this is out of scope of this guide).

It usually happens (especially for large deployments) that the Flink cluster used with Nusssknacker has quite a lot of jobs (each representing one scenario), many of them are quite small in terms of needed resources - this is different to usual Flink setup, where a cluster has one or few jobs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import pl.touk.nussknacker.engine.api.process.ProcessName

import scala.concurrent.Future

trait ProcessManager extends AutoCloseable {
trait DeploymentManager extends AutoCloseable {

//TODO: savepointPath is very flink specific, how can we handle that differently?
def deploy(processVersion: ProcessVersion, deploymentData: DeploymentData, processDeploymentData: ProcessDeploymentData, savepointPath: Option[String]) : Future[Option[ExternalDeploymentId]]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package pl.touk.nussknacker.genericmodel

import com.typesafe.config.ConfigFactory
import pl.touk.nussknacker.engine.ProcessManagerProvider
import pl.touk.nussknacker.engine.testing.{LocalModelData, ProcessManagerProviderStub}
import pl.touk.nussknacker.engine.DeploymentManagerProvider
import pl.touk.nussknacker.engine.testing.{LocalModelData, DeploymentManagerProviderStub}
import pl.touk.nussknacker.ui.util.LocalNussknackerWithSingleModel

//Sample app to simplify local development.
Expand All @@ -13,7 +13,7 @@ object RunGenericModelLocally extends App {

val managerConfig = ConfigFactory.empty()
//For simplicity we use stub here, one can add real Flink implementation after add appropriate dependencies
val provider: ProcessManagerProvider = new ProcessManagerProviderStub
val provider: DeploymentManagerProvider = new DeploymentManagerProviderStub
LocalNussknackerWithSingleModel.run(modelData, provider, managerConfig, Set("Default"))

}
4 changes: 2 additions & 2 deletions engine/flink/management/periodic/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ process is scheduled to be run again according to the schedule.

## Usage

- Implement `ProcessManagerProvider` using `PeriodicProcessManagerProvider`. Following components need to provided:
- Implement `DeploymentManagerProvider` using `PeriodicDeploymentManagerProvider`. Following components need to provided:
- Underlying engine, currently only Flink is supported.
- Optional `SchedulePropertyExtractor` to determine how to construct an instance of a periodic property. By default
a cron expression set in process properties is used to describe when a process should be run.
- Optional `EnrichDeploymentWithJarDataFactory` if you would like to, for example, extend process configuration,
by default nothing is done.
- Add service provider with your `ProcessManagerProvider` implementation.
- Add service provider with your `DeploymentManagerProvider` implementation.
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ import sttp.client.{NothingT, SttpBackend}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}

object PeriodicProcessManager {
def apply(delegate: ProcessManager,
object PeriodicDeploymentManager {
def apply(delegate: DeploymentManager,
schedulePropertyExtractor: SchedulePropertyExtractor,
enrichDeploymentWithJarDataFactory: EnrichDeploymentWithJarDataFactory,
periodicBatchConfig: PeriodicBatchConfig,
flinkConfig: FlinkConfig,
originalConfig: Config,
modelData: ModelData,
listenerFactory: PeriodicProcessListenerFactory,
additionalDeploymentDataProvider: AdditionalDeploymentDataProvider): PeriodicProcessManager = {
additionalDeploymentDataProvider: AdditionalDeploymentDataProvider): PeriodicDeploymentManager = {
implicit val system: ActorSystem = ActorSystem("periodic-process-manager-provider")
implicit val ec: ExecutionContext = ExecutionContext.global
implicit val backend: SttpBackend[Future, Nothing, NothingT] = AsyncHttpClientFutureBackend.usingConfigBuilder { builder =>
Expand All @@ -56,15 +56,15 @@ object PeriodicProcessManager {
Await.ready(backend.close(), 10 seconds)
()
}
new PeriodicProcessManager(delegate, service, schedulePropertyExtractor, toClose)
new PeriodicDeploymentManager(delegate, service, schedulePropertyExtractor, toClose)
}
}

class PeriodicProcessManager(val delegate: ProcessManager,
class PeriodicDeploymentManager(val delegate: DeploymentManager,
service: PeriodicProcessService,
schedulePropertyExtractor: SchedulePropertyExtractor,
toClose: () => Unit)
(implicit val ec: ExecutionContext) extends ProcessManager with LazyLogging {
(implicit val ec: ExecutionContext) extends DeploymentManager with LazyLogging {

override def deploy(processVersion: ProcessVersion,
deploymentData: DeploymentData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,32 @@ package pl.touk.nussknacker.engine.management.periodic
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import pl.touk.nussknacker.engine.api.TypeSpecificData
import pl.touk.nussknacker.engine.api.deployment.ProcessManager
import pl.touk.nussknacker.engine.api.deployment.DeploymentManager
import pl.touk.nussknacker.engine.api.queryablestate.QueryableClient
import pl.touk.nussknacker.engine.management.FlinkConfig
import pl.touk.nussknacker.engine.management.periodic.service.{AdditionalDeploymentDataProvider, DefaultAdditionalDeploymentDataProvider, EmptyListener, EmptyPeriodicProcessListenerFactory, PeriodicProcessListener, PeriodicProcessListenerFactory}
import pl.touk.nussknacker.engine.util.config.ConfigEnrichments.RichConfig
import pl.touk.nussknacker.engine.{ModelData, ProcessManagerProvider}
import pl.touk.nussknacker.engine.{ModelData, DeploymentManagerProvider}

class PeriodicProcessManagerProvider(delegate: ProcessManagerProvider,
class PeriodicDeploymentManagerProvider(delegate: DeploymentManagerProvider,
schedulePropertyExtractor: SchedulePropertyExtractor = CronSchedulePropertyExtractor(),
enrichDeploymentWithJarDataFactory: EnrichDeploymentWithJarDataFactory = EnrichDeploymentWithJarDataFactory.noOp,
listenerFactory: PeriodicProcessListenerFactory = EmptyPeriodicProcessListenerFactory,
additionalDeploymentDataProvider: AdditionalDeploymentDataProvider = DefaultAdditionalDeploymentDataProvider
) extends ProcessManagerProvider with LazyLogging {
) extends DeploymentManagerProvider with LazyLogging {

override def name: String = s"${delegate.name}Periodic"

override def createProcessManager(modelData: ModelData, config: Config): ProcessManager = {
override def createDeploymentManager(modelData: ModelData, config: Config): DeploymentManager = {
logger.info("Creating periodic scenario manager")
val delegateProcessManager = delegate.createProcessManager(modelData, config)
val delegateDeploymentManager = delegate.createDeploymentManager(modelData, config)

import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ArbitraryTypeReader._
val periodicBatchConfig = config.as[PeriodicBatchConfig]("processManager")
val periodicBatchConfig = config.as[PeriodicBatchConfig]("deploymentManager")
val flinkConfig = config.rootAs[FlinkConfig]
PeriodicProcessManager(
delegate = delegateProcessManager,
PeriodicDeploymentManager(
delegate = delegateDeploymentManager,
schedulePropertyExtractor = schedulePropertyExtractor,
enrichDeploymentWithJarDataFactory = enrichDeploymentWithJarDataFactory,
periodicBatchConfig = periodicBatchConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import java.time.{Clock, LocalDateTime}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal

class PeriodicProcessService(delegateProcessManager: ProcessManager,
class PeriodicProcessService(delegateDeploymentManager: DeploymentManager,
jarManager: JarManager,
scheduledProcessesRepository: PeriodicProcessesRepository,
periodicProcessListener: PeriodicProcessListener,
Expand Down Expand Up @@ -91,7 +91,7 @@ class PeriodicProcessService(delegateProcessManager: ProcessManager,

//Currently we don't allow simultaneous runs of one scenario - only sequential, so if other schedule kicks in, it'll have to wait
private def checkIfNotRunning(toDeploy: PeriodicProcessDeployment): Future[Option[PeriodicProcessDeployment]] = {
delegateProcessManager.findJobStatus(toDeploy.periodicProcess.processVersion.processName).map {
delegateDeploymentManager.findJobStatus(toDeploy.periodicProcess.processVersion.processName).map {
case Some(state) if state.isDeployed =>
logger.debug(s"Deferring run of ${toDeploy.display} as scenario is currently running")
None
Expand All @@ -102,7 +102,7 @@ class PeriodicProcessService(delegateProcessManager: ProcessManager,
def handleFinished: Future[Unit] = {

def handleSingleProcess(deployedProcess: PeriodicProcessDeployment): Future[Unit] = {
delegateProcessManager.findJobStatus(deployedProcess.periodicProcess.processVersion.processName).flatMap { state =>
delegateDeploymentManager.findJobStatus(deployedProcess.periodicProcess.processVersion.processName).flatMap { state =>
handleFinishedAction(deployedProcess, state)
.flatMap { needsReschedule =>
if (needsReschedule) reschedule(deployedProcess, state) else scheduledProcessesRepository.monad.pure(()).emptyCallback
Expand Down Expand Up @@ -170,7 +170,7 @@ class PeriodicProcessService(delegateProcessManager: ProcessManager,
}

def deactivate(processName: ProcessName): Future[Unit] = for {
status <- delegateProcessManager.findJobStatus(processName)
status <- delegateDeploymentManager.findJobStatus(processName)
maybePeriodicDeployment <- getLatestDeployment(processName)
actionResult <- maybePeriodicDeployment match {
case Some(periodicDeployment) => handleFinishedAction(periodicDeployment, status)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import pl.touk.nussknacker.engine.api.deployment.{DeploymentData, ExternalDeploy
import pl.touk.nussknacker.engine.management.periodic.model.DeploymentWithJarData
import pl.touk.nussknacker.engine.management.periodic.{EnrichDeploymentWithJarData, JarManager, PeriodicBatchConfig, model}
import pl.touk.nussknacker.engine.management.rest.{FlinkClient, HttpFlinkClient}
import pl.touk.nussknacker.engine.management.{FlinkConfig, FlinkModelJar, FlinkProcessManager, FlinkStreamingRestManager}
import pl.touk.nussknacker.engine.management.{FlinkConfig, FlinkModelJar, FlinkDeploymentManager, FlinkStreamingRestManager}
import pl.touk.nussknacker.engine.modelconfig.InputConfigDuringExecution
import sttp.client.{NothingT, SttpBackend}

Expand Down Expand Up @@ -70,7 +70,7 @@ private[periodic] class FlinkJarManager(flinkClient: FlinkClient,
val processVersion = deploymentWithJarData.processVersion
logger.info(s"Deploying scenario ${processVersion.processName.value}, version id: ${processVersion.versionId} and jar: ${deploymentWithJarData.jarFileName}")
val jarFile = jarsDir.resolve(deploymentWithJarData.jarFileName).toFile
val args = FlinkProcessManager.prepareProgramArgs(deploymentWithJarData.modelConfig,
val args = FlinkDeploymentManager.prepareProgramArgs(deploymentWithJarData.modelConfig,
processVersion,
deploymentData,
GraphProcess(deploymentWithJarData.processJson))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import pl.touk.nussknacker.engine.management.FlinkProcessStateDefinitionManager

import scala.concurrent.Future

class ProcessManagerStub extends ProcessManager {
class DeploymentManagerStub extends DeploymentManager {

var jobStatus: Option[ProcessState] = None

Expand Down
Loading