Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename processTypes to scenarioTypes #1910

Merged
merged 6 commits into from
Jul 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ jobs:
CYPRESS_BASE_URL: http://localhost:8083
shell: bash
run: |
docker run -d -p 8083:8080 -e CONFIG_FORCE_processTypes_streaming_engineConfig_type=stub -e NUSSKNACKER_CONFIG_FILE=/opt/nussknacker/conf/dev-application.conf --name nussknacker_e2e_fe touk/nussknacker:$NUSSKNACKER_VERSION
docker run -d -p 8083:8080 -e CONFIG_FORCE_scenarioTypes_streaming_engineConfig_type=stub -e NUSSKNACKER_CONFIG_FILE=/opt/nussknacker/conf/dev-application.conf --name nussknacker_e2e_fe touk/nussknacker:$NUSSKNACKER_VERSION
cd ui/client
npx wait-on $CYPRESS_BASE_URL && npm run test:e2e
docker kill nussknacker_e2e_fe
Expand Down
2 changes: 1 addition & 1 deletion demo/docker/nussknacker/nussknacker.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#This configuration auguments and overrides configuration in docker image
#Here we configure OpenAPI based enricher, which is implemented by python service in customerservice
{
processTypes.streaming.modelConfig {
scenarioTypes.streaming.modelConfig {
#We add additional jar to model classPath
classPath += "components/openapi.jar"
components.openAPI {
Expand Down
14 changes: 7 additions & 7 deletions docs/Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ usersFile: "./conf/users.conf"
environment: "demo"
attachmentsPath: "/tmp/touk/esp-frontend/attachments"

processTypes {
scenarioTypes {
streaming {
engineConfig {
type: "flinkStreaming"
Expand Down Expand Up @@ -91,7 +91,7 @@ Detailed rules are described in [documentation](https://github.com/lightbend/con
```
you can override them in ```application.conf``` like this:
```hocon
processTypes {
scenarioTypes {
type1 {
modelConfig {
timeout: 20s
Expand All @@ -116,7 +116,7 @@ All configurations can also be overridden with environmental variables. Please c
In particular, Nussknacker docker image is executed with ```-Dconfig.override_with_env_vars=true```
For example, to override samples above you would have to define:
```shell script
CONFIG_FORCE_processTypes_type1_modelConfig_timeout=30s
CONFIG_FORCE_scenarioTypes_type1_modelConfig_timeout=30s
CONFIG_FORCE_environmentAlert_content="MY environment"
```

Expand All @@ -133,7 +133,7 @@ categoriesConfig: {
"fraud": "streaming",
}
```
For each category you have to define its processing type (`streaming` in examples above). You can read about processing
For each category you have to define its scenario type (`streaming` in examples above). You can read about processing
types and their configurations below.

### Process Toolbar Configuration
Expand Down Expand Up @@ -332,11 +332,11 @@ akka {
* `environment` - key of environment (used e.g. for alerts) - e.g. test or production
* `attachmentsPath` - location on disk where attachments will be stored

##Processing types
##Scenario types
One installation of Nussknacker can handle many different processing engines - currently the main supported engine is
Flink in streaming mode. Processing engines are defined in `processTypes` section. You can e.g. have two processing
Flink in streaming mode. Processing engines are defined in `scenarioTypes` section. You can e.g. have two processing
types pointing to separate Flink clusters. Each processing engine has its name (e.g. `flinkStreaming`).
Processing type configuration consists of two main parts:
Scenario type configuration consists of two main parts:
* engine configuration
* model configuration

Expand Down
2 changes: 1 addition & 1 deletion docs/Engines.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ To create/customize Nussknacker engine you have to:

To configure process category to use particular engine, see following configuration:
```
processTypes {
scenarioTypes {
"streaming": {
type: ... //name of engine, e.g. "flinkStreaming"
//... - additional engine config (e.g. location of Flink cluster, etc.)
Expand Down
1 change: 1 addition & 0 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ To see biggest differences please consult the [changelog](Changelog.md).
* [#1886](https://github.com/TouK/nussknacker/pull/1886) aggregate-sliding with emitWhenEventLeft = true, aggregate-tumbling and aggregate-session components now
doesn't emit full context of variables that were before node (because of performance reasons and because that wasn't obvious which one context is emitted).
If you want to emit some information other than aggregated value and key (availabled via new `#key` variable), you should use `#AGG.map` expression in `aggregateBy`.
* [#1910](https://github.com/TouK/nussknacker/pull/1910) `processTypes` renamed to `scenarioTypes`. You can still use old `processTypes` configuration. Old configuration will be removed in version `0.5.0`.

## In version 0.3.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ abstract class ModelConfigLoader extends Serializable {
* [[resolveConfig]])
*
* Method used for performance reasons to reduce serialized configuration size inside deployed processes. By default
* config from main nussknacker file at path: processTypes.{type_name}.modelConfig is passed unchanged.
* config from main nussknacker file at path: scenarioTypes.{type_name}.modelConfig is passed unchanged.
*
* @param inputConfig configuration from processTypes.{type_name}.modelConfig
* @param inputConfig configuration from scenarioTypes.{type_name}.modelConfig
* @return config part that is later passed to a running process (see e.g. FlinkProcessCompiler)
*/
def resolveInputConfigDuringExecution(inputConfig: Config, classLoader: ClassLoader): InputConfigDuringExecution = {
Expand Down
2 changes: 1 addition & 1 deletion nussknacker-dist/src/universal/conf/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ categoriesConfig: {
"Default": "streaming"
}

processTypes {
scenarioTypes {
"streaming": {
engineConfig: ${flinkEngineConfig}
modelConfig: {
Expand Down
2 changes: 1 addition & 1 deletion nussknacker-dist/src/universal/conf/dev-application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ flinkEngineConfig {
queryableStateProxyUrl: ${?FLINK_QUERYABLE_STATE_PROXY_URL}
}

processTypes {
scenarioTypes {
"streaming": {
engineConfig: ${flinkEngineConfig}
modelConfig: {
Expand Down
2 changes: 1 addition & 1 deletion ui/client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"start:backend-docker": "npm run clean-translations && start-server-and-test start-backend:docker http-get://localhost:8080/static/main.html start",
"start:backend-staging": "npm run clean-translations && BACKEND_DOMAIN=https://staging.nussknacker.io webpack serve",
"start:backend-demo": "npm run clean-translations && BACKEND_DOMAIN=https://demo.nussknacker.io webpack serve",
"start-backend:docker": "docker run -i -p 8080:8080 -e CONFIG_FORCE_processTypes_streaming_engineConfig_type=stub -e NUSSKNACKER_CONFIG_FILE=/opt/nussknacker/conf/dev-application.conf --pull always -P touk/nussknacker:staging-latest",
"start-backend:docker": "docker run -i -p 8080:8080 -e CONFIG_FORCE_scenarioTypes_streaming_engineConfig_type=stub -e NUSSKNACKER_CONFIG_FILE=/opt/nussknacker/conf/dev-application.conf --pull always -P touk/nussknacker:staging-latest",
"pretest": "npm run check",
"test:unit": "jest",
"test:e2e": "cypress run",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class DefinitionResources(modelDataProvider: ProcessingTypeDataProvider[ModelDat
}
} ~ dictResources.route(processingTypeData.modelData)
}.getOrElse {
complete(HttpResponse(status = StatusCodes.NotFound, entity = s"Processing type: $processingType not found"))
complete(HttpResponse(status = StatusCodes.NotFound, entity = s"Scenario type: $processingType not found"))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class QueryableStateResources(typeToConfig: ProcessingTypeDataProvider[Processin

private def fetchState(processingType: String, jobId: String, queryName: String, key: Option[String]): Future[String] = {
typeToConfig.forTypeUnsafe(processingType).queryableClient match {
case None => Future.failed(new Exception(s"Queryable client not found for processing type $processingType"))
case None => Future.failed(new Exception(s"Queryable client not found for scenario type $processingType"))
case Some(queryableClient) =>
key match {
case Some(k) => queryableClient.fetchJsonState(jobId, queryName, k)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package pl.touk.nussknacker.ui.process.processingtypedata

import pl.touk.nussknacker.engine.{ProcessManagerProvider, ProcessingTypeConfig, ProcessingTypeData}
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ValueReader
import pl.touk.nussknacker.engine.ProcessingTypeData.ProcessingType
import pl.touk.nussknacker.engine.util.loader.ScalaServiceLoader
import pl.touk.nussknacker.engine.{ProcessManagerProvider, ProcessingTypeConfig, ProcessingTypeData}

object ProcessingTypeDataReader extends LazyLogging {

Expand All @@ -29,6 +29,15 @@ object ProcessingTypeDataReader extends LazyLogging {
key -> config.as[ProcessingTypeConfig](key)(ProcessingTypeConfig.reader)
}.toMap
}
config.as[Map[String, ProcessingTypeConfig]]("processTypes")

val processTypesOption = config.getAs[Map[String, ProcessingTypeConfig]]("processTypes")
val scenarioTypesOption = config.getAs[Map[String, ProcessingTypeConfig]]("scenarioTypes")
(scenarioTypesOption, processTypesOption) match {
case (Some(scenarioTypes), _) => scenarioTypes
case (None, Some(processTypes)) =>
logger.warn("ScenarioTypes configuration is missing - falling back to old processTypes configuration - processTypes will be removed in next version")
processTypes
case (None, None) => throw new RuntimeException("No scenario types configuration provided")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ class BasicProcessingTypeDataReload(loadMethod: () => ProcessingTypeDataProvider
@volatile private var current: ProcessingTypeDataProvider[ProcessingTypeData] = loadMethod()

override def reloadAll(): Unit = synchronized {
logger.info("Reloading processing type data")
logger.info("Reloading scenario type data")
val old = current
current = loadMethod()
logger.info("Processing type data reloaded, closing old models")
logger.info("Scenario type data reloaded, closing old models")
old.all.values.foreach(_.close())
logger.info("Processing type data reloading finished")
logger.info("Scenario type data reloading finished")
}
}

Expand Down
2 changes: 1 addition & 1 deletion ui/server/src/test/resources/ui.conf
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ customProcesses {
"customProcess1": "pl.touk.custom.NotExistingClass"
}

processTypes {
scenarioTypes {
"streaming" {
engineConfig {
restUrl: "http://localhost:8081"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ class DefinitionResourcesSpec extends FunSpec with ScalatestRouteTest with FailF

private implicit final val string: FromEntityUnmarshaller[String] = Unmarshaller.stringUnmarshaller.forContentTypes(ContentTypeRange.*)

it("should handle missing processing type") {
it("should handle missing scenario type") {
getProcessDefinitionData("foo", Map.empty[String, Long].asJson) ~> check {
status shouldBe StatusCodes.NotFound
}
}

it("should return definition data for existing processing type") {
it("should return definition data for existing scenario type") {
getProcessDefinitionData(existingProcessingType, Map.empty[String, Long].asJson) ~> check {
status shouldBe StatusCodes.OK

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class ConfigurationTest extends FunSuite with Matchers {

//to be able to run this test:
//add -Dconfig.override_with_env_vars=true to VM parameters
//set env variable: CONFIG_FORCE_processTypes_streaming_modelConfig_testProperty=testValue
//set env variable: CONFIG_FORCE_scenarioTypes_streaming_modelConfig_testProperty=testValue
ignore("check if env properties are used/passed") {
modelDataConfig.getString("testProperty") shouldBe "testValue"
modelData.inputConfigDuringExecution.config.getString("testProperty") shouldBe "testValue"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package pl.touk.nussknacker.ui.process.processingtypedata

import com.typesafe
import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import org.scalatest.FunSuite
import org.scalatest.Matchers.{convertToAnyShouldWrapper, include}

class ProcessingTypeDataReaderSpec extends FunSuite {

private val oldConfiguration: Config = ConfigFactory.parseString(
"""
|processTypes {
| "streaming" {
| engineConfig {
| jobManagerTimeout: 1m
| restUrl: "http://localhost:8081"
| queryableStateProxyUrlMissing: "localhost:9123"
| type: "flinkStreaming"
| }
|
| modelConfig {
| classPath: ["engine/flink/management/sample/target/scala-2.12/managementSample.jar"]
| }
| }
|}
|""".stripMargin
)

import scala.collection.JavaConverters._

test("should load old processTypes configuration") {
val processTypes = ProcessingTypeDataReader.loadProcessingTypeData(oldConfiguration)

processTypes.all.size shouldBe 1
processTypes.all.keys.take(1) shouldBe Set("streaming")
}

test("should optionally load scenarioTypes configuration") {
val configuration = ConfigFactory.parseMap(Map[String, Any](
"scenarioTypes.newStreamingScenario" -> ConfigValueFactory.fromAnyRef(oldConfiguration.getConfig("processTypes.streaming").root())
).asJava)

val config = oldConfiguration.withFallback(configuration).resolve()

val processTypes = ProcessingTypeDataReader.loadProcessingTypeData(config)

processTypes.all.size shouldBe 1
processTypes.all.keys.take(1) shouldBe Set("newStreamingScenario")
}

test("should throw when required configuration is missing") {
val configuration = ConfigFactory.parseString(
"""
|scenarioTypes {
| "streaming" {
| engineConfig {
| restUrlMissing: "http://localhost:8081"
| type: "flinkStreaming"
| }
|
| modelConfig {
| classPath: ["engine/flink/management/sample/target/scala-2.12/managementSample.jar"]
| }
| }
|}
|""".stripMargin
)

val config = configuration.resolve()

intercept[typesafe.config.ConfigException] {
ProcessingTypeDataReader.loadProcessingTypeData(config)
}.getMessage should include("No configuration setting found for key 'root.restUrl'")
}

test("should throw when no configuration is provided") {
val configuration = ConfigFactory.parseString(
"""
|test {}
|""".stripMargin
)

val config = configuration.resolve()

intercept[RuntimeException] {
ProcessingTypeDataReader.loadProcessingTypeData(config)
}.getMessage should include("No scenario types configuration provided")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ object ConfigWithScalaVersion {

val config: Config = ScalaMajorVersionConfig.configWithScalaMajorVersion(ConfigFactory.parseResources("ui.conf"))

val streamingProcessTypeConfig: Config = config.getConfig("processTypes.streaming")
val streamingProcessTypeConfig: Config = config.getConfig("scenarioTypes.streaming")
}