diff --git a/demo/docker/docker-compose.yml b/demo/docker/docker-compose.yml index 93927cd8f39..27fea15852a 100644 --- a/demo/docker/docker-compose.yml +++ b/demo/docker/docker-compose.yml @@ -10,7 +10,6 @@ services: #multiple, comma separated, config files can be used. They will be merged in order, via HOCON fallback mechanism #https://github.com/lightbend/config/blob/master/HOCON.md#config-object-merging-and-file-merging NUSSKNACKER_CONFIG_FILE: ${NUSSKNACKER_CONFIG_FILE-/opt/nussknacker/conf/application.conf,/opt/nussknacker/conf/nussknacker.conf} - COUNTS_URL: http://influxdb:8086/query JDK_JAVA_OPTIONS: -Xmx256M FLINK_ROCKSDB_CHECKPOINT_DATA_URI: file:///opt/flink/data/rocksdb-checkpoints volumes: diff --git a/demo/docker/nussknacker/nussknacker.conf b/demo/docker/nussknacker/nussknacker.conf index 6a7cb79347c..4a2f508dbd4 100644 --- a/demo/docker/nussknacker/nussknacker.conf +++ b/demo/docker/nussknacker/nussknacker.conf @@ -2,7 +2,8 @@ #Here we configure OpenAPI based enricher, which is implemented by python service in customerservice { processTypes.streaming.modelConfig { - classPath: ["model/genericModel.jar", "components/openapi.jar"] + #We add additional jar to model classPath + classPath += "components/openapi.jar" components.openAPI { url: "http://customerservice:5000/swagger" rootUrl: "http://customerservice:5000" diff --git a/engine/flink/generic/src/main/resources/defaultModelConfig.conf b/engine/flink/generic/src/main/resources/defaultModelConfig.conf index f45f2c85692..f7d28b3e92c 100644 --- a/engine/flink/generic/src/main/resources/defaultModelConfig.conf +++ b/engine/flink/generic/src/main/resources/defaultModelConfig.conf @@ -6,7 +6,6 @@ } timeout: 10s - delayBetweenAttempts: 10s checkpointConfig { checkpointInterval: 10m } diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkConfig.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkConfig.scala index b2a0d1bbbd7..c9259e3228e 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkConfig.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkConfig.scala @@ -1,10 +1,9 @@ package pl.touk.nussknacker.engine.management -import com.typesafe.config.Config - import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.DurationInt -case class FlinkConfig(jobManagerTimeout: FiniteDuration, +case class FlinkConfig(restUrl: String, queryableStateProxyUrl: Option[String], - restUrl: String, - shouldVerifyBeforeDeploy: Option[Boolean]) + jobManagerTimeout: FiniteDuration = 1 minute, + shouldVerifyBeforeDeploy: Boolean = true) diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala index 0bfdc6ae463..627cf8ceb45 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala @@ -20,7 +20,7 @@ import scala.concurrent.{Await, Future} class FlinkRestManager(config: FlinkConfig, modelData: ModelData, mainClassName: String) (implicit backend: SttpBackend[Future, Nothing, NothingT]) - extends FlinkProcessManager(modelData, config.shouldVerifyBeforeDeploy.getOrElse(true), mainClassName) with LazyLogging { + extends FlinkProcessManager(modelData, config.shouldVerifyBeforeDeploy, mainClassName) with LazyLogging { protected lazy val jarFile: File = new FlinkModelJar().buildJobJar(modelData) diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/rest/HttpFlinkClient.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/rest/HttpFlinkClient.scala index 3dad4ef5507..a63df3dfbc5 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/rest/HttpFlinkClient.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/rest/HttpFlinkClient.scala @@ -1,7 +1,6 @@ package pl.touk.nussknacker.engine.management.rest import com.typesafe.scalalogging.LazyLogging -import io.circe.Error import pl.touk.nussknacker.engine.api.CirceUtil import pl.touk.nussknacker.engine.api.deployment.{ExternalDeploymentId, SavepointResult} import pl.touk.nussknacker.engine.management.rest.flinkRestModel._ diff --git a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala index 8943b2a8919..066a9d12ccd 100644 --- a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala +++ b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala @@ -26,7 +26,7 @@ import scala.concurrent.duration._ //TODO move some tests to FlinkHttpClientTest class FlinkRestManagerSpec extends FunSuite with Matchers with PatientScalaFutures { - private val config = FlinkConfig(10 minute, None, "http://test.pl", None) + private val config = FlinkConfig("http://test.pl", None) private var statuses: List[JobOverview] = List() diff --git a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/rest/FlinkHttpClientTest.scala b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/rest/FlinkHttpClientTest.scala index 68535c85512..e5c817178fb 100644 --- a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/rest/FlinkHttpClientTest.scala +++ b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/rest/FlinkHttpClientTest.scala @@ -32,7 +32,7 @@ class FlinkHttpClientTest extends FunSuite private val flinkJarFile = JarFile(jarId, jarFileName) private val deploymentId = ExternalDeploymentId("someDeploymentId") - val config: FlinkConfig = FlinkConfig(FiniteDuration(10, TimeUnit.SECONDS), None, "http://localhost:12345/flink", None) + val config: FlinkConfig = FlinkConfig("http://localhost:12345/flink", None) test("uploadJarFileIfNotExists - should upload jar") { implicit val backend = SttpBackendStub.asynchronousFuture.whenRequestMatchesPartial { diff --git a/engine/interpreter/src/main/scala/pl/touk/nussknacker/engine/spel/internal/propertyAccessors.scala b/engine/interpreter/src/main/scala/pl/touk/nussknacker/engine/spel/internal/propertyAccessors.scala index 2225ca92acd..5844915a084 100644 --- a/engine/interpreter/src/main/scala/pl/touk/nussknacker/engine/spel/internal/propertyAccessors.scala +++ b/engine/interpreter/src/main/scala/pl/touk/nussknacker/engine/spel/internal/propertyAccessors.scala @@ -15,8 +15,6 @@ import scala.concurrent.duration._ object propertyAccessors { def configured(): Seq[PropertyAccessor] = { - //FIXME: configurable timeout... - val lazyValuesTimeout = 1 minute Seq( new ReflectivePropertyAccessor(), diff --git a/engine/processReports/src/it/scala/pl/touk/nussknacker/processCounts/influxdb/InfluxCountsReporterSpec.scala b/engine/processReports/src/it/scala/pl/touk/nussknacker/processCounts/influxdb/InfluxCountsReporterSpec.scala index ab4ef6577b3..8027dd85862 100644 --- a/engine/processReports/src/it/scala/pl/touk/nussknacker/processCounts/influxdb/InfluxCountsReporterSpec.scala +++ b/engine/processReports/src/it/scala/pl/touk/nussknacker/processCounts/influxdb/InfluxCountsReporterSpec.scala @@ -4,7 +4,6 @@ import com.dimafeng.testcontainers.{ForAllTestContainer, InfluxDBContainer} import org.asynchttpclient.DefaultAsyncHttpClientConfig import org.influxdb.InfluxDBFactory import org.influxdb.dto.Point -import org.scalatest.concurrent.Eventually import org.scalatest.prop.TableDrivenPropertyChecks import org.scalatest.{Assertion, FunSuite, Matchers} import pl.touk.nussknacker.processCounts.{CannotFetchCountsError, ExecutionCount, RangeCount} @@ -110,7 +109,7 @@ class InfluxCountsReporterSpec extends FunSuite with ForAllTestContainer with Ta private val influxDB = InfluxDBFactory.connect(container.url, container.username, container.password) def reporter(queryMode: QueryMode.Value) = new InfluxCountsReporter(env, - InfluxConfig(container.url + "/query", container.username, container.password, container.database, queryMode, Some(config)) + InfluxConfig(container.url + "/query", Option(container.username), Option(container.password), container.database, queryMode, Some(config)) ) influxDB.setDatabase(container.database) diff --git a/engine/processReports/src/main/scala/pl/touk/nussknacker/processCounts/influxdb/InfluxConfig.scala b/engine/processReports/src/main/scala/pl/touk/nussknacker/processCounts/influxdb/InfluxConfig.scala index bee56b40d5d..4a1c7ef7b89 100644 --- a/engine/processReports/src/main/scala/pl/touk/nussknacker/processCounts/influxdb/InfluxConfig.scala +++ b/engine/processReports/src/main/scala/pl/touk/nussknacker/processCounts/influxdb/InfluxConfig.scala @@ -1,6 +1,6 @@ package pl.touk.nussknacker.processCounts.influxdb -case class InfluxConfig(influxUrl: String, user: String, password: String, +case class InfluxConfig(influxUrl: String, user: Option[String], password: Option[String], database: String, queryMode: QueryMode.Value = QueryMode.OnlySingleDifference, metricsConfig: Option[MetricsConfig]) diff --git a/engine/processReports/src/main/scala/pl/touk/nussknacker/processCounts/influxdb/SimpleInfluxClient.scala b/engine/processReports/src/main/scala/pl/touk/nussknacker/processCounts/influxdb/SimpleInfluxClient.scala index a9eba6a39a8..5c71f0dab96 100644 --- a/engine/processReports/src/main/scala/pl/touk/nussknacker/processCounts/influxdb/SimpleInfluxClient.scala +++ b/engine/processReports/src/main/scala/pl/touk/nussknacker/processCounts/influxdb/SimpleInfluxClient.scala @@ -1,7 +1,6 @@ package pl.touk.nussknacker.processCounts.influxdb import java.util.concurrent.TimeUnit - import sttp.client._ import sttp.client.circe._ import io.circe.Decoder @@ -9,6 +8,7 @@ import pl.touk.nussknacker.engine.sttp.SttpJson import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext, Future} +import scala.language.implicitConversions class InfluxException(cause: Throwable) extends Exception(cause) case class InvalidInfluxResponse(message: String, cause: Throwable) extends InfluxException(cause) { @@ -24,8 +24,12 @@ class SimpleInfluxClient(config: InfluxConfig)(implicit backend: SttpBackend[Fut private val uri = uri"${config.influxUrl}" def query(query: String)(implicit ec: ExecutionContext): Future[List[InfluxSeries]] = { - basicRequest.get(uri.params("db" -> config.database, "q" -> query)) - .auth.basic(config.user, config.password) + def addAuth[T, S](req: Request[T, S]): RequestT[Identity, T, S] = (for { + user <- config.user + password <- config.password + } yield req.auth.basic(user, password)).getOrElse(req) + + addAuth(basicRequest.get(uri.params("db" -> config.database, "q" -> query))) .response(asJson[InfluxResponse]) .send() .flatMap(SttpJson.failureToFuture[InfluxResponse]) diff --git a/nussknacker-dist/src/universal/conf/application.conf b/nussknacker-dist/src/universal/conf/application.conf index b6ffe641461..351e60076e1 100644 --- a/nussknacker-dist/src/universal/conf/application.conf +++ b/nussknacker-dist/src/universal/conf/application.conf @@ -1,4 +1,6 @@ -base: { include "base-application.conf" } +#This file contains sensible defaults for simple, recommended Nussknacker deployment - with one generic model, deployed on Flink +#We assume integration with Grafana, Influx and Flink as in base docker setup. In other deployments one will probably need to change URLs +#In most cases it should be possible to override parts of this configuration by providing additional config file (see demo/docker/nussknacker/nussknacker.conf) environment: "local" @@ -6,14 +8,12 @@ categoriesConfig: { "Default": "streaming" } -modelClassPath: ["model/genericModel.jar"] -modelClassPath: ${?MODEL_CLASS_PATH} - processTypes { "streaming": { - engineConfig: ${base.flinkEngineConfig} + engineConfig: ${flinkEngineConfig} modelConfig: { - classPath: ${modelClassPath} + classPath: ["model/genericModel.jar"] + classPath: ${?MODEL_CLASS_PATH} rocksDB: { checkpointDataUri: ${?FLINK_ROCKSDB_CHECKPOINT_DATA_URI} } @@ -22,17 +22,25 @@ processTypes { } } +flinkEngineConfig { + jobManagerTimeout: 1m + type: "flinkStreaming" + restUrl: "http://jobmanager:8081" + restUrl: ${?FLINK_REST_URL} + queryableStateProxyUrl: "taskmanager:9069" + queryableStateProxyUrl: ${?FLINK_QUERYABLE_STATE_PROXY_URL} +} + +grafanaUrl: "/grafana" +grafanaUrl: ${?GRAFANA_URL} + metricsSettings: { - url: ${base.grafanaUrl}"/d/$dashboard?theme=dark&var-processName=$process&var-env="${environment} + url: ${grafanaUrl}"/d/$dashboard?theme=dark&var-processName=$process&var-env="${environment} defaultDashboard: "nussknacker-scenario" - processingTypeToDashboard: { - "streaming": "nussknacker-scenario" - } } -# TODO: lightbend config can't include files on root level - move nussknacker config on nk level and get rid of this below -db: ${base.db} - -commentSettings: ${base.commentSettings} -attachmentsPath: ${base.attachmentsPath} -countsSettings: ${base.countsSettings} +countsSettings { + influxUrl: "http://influxdb:8086/query" + influxUrl: ${?COUNTS_URL} + database: "esp" +} diff --git a/nussknacker-dist/src/universal/conf/base-application.conf b/nussknacker-dist/src/universal/conf/base-application.conf deleted file mode 100644 index 073351a0c32..00000000000 --- a/nussknacker-dist/src/universal/conf/base-application.conf +++ /dev/null @@ -1,72 +0,0 @@ -storageDir: ./storage -storageDir: ${?STORAGE_DIR} - -dbFilePath: ${storageDir}"/db" -dbFilePath: ${?DB_FILE_PATH} - -dbName: "db" -dbName: ${?DB_NAME} - -dbUser: "sa" -dbUser: ${?DB_USER} - -dbPassword: "" -dbPassword: ${?DB_PASSWORD} - -dbUrl: "jdbc:hsqldb:file:"${dbFilePath}";sql.syntax_ora=true" -dbUrl: ${?DB_URL} - -dbDriver: "org.hsqldb.jdbc.JDBCDriver" -dbDriver: ${?DB_DRIVER} - -dbConnectionTimeout: 30000 -dbConnectionTimeout: ${?DB_CONNECTION_TIMEOUT} - -db { - url: ${dbUrl} - driver: ${dbDriver} - password: ${dbPassword} - user: ${dbUser} - connectionTimeout: ${dbConnectionTimeout} -} - -commentSettings: { - matchExpression: "(issues/[0-9]*)" - link: "https://github.com/TouK/nussknacker/$1" -} - -developmentMode: ${?DEVELOPMENT_MODE} - -attachmentsPath: ${storageDir}"/attachments" - -proxyUrl: "http://localhost:8081" -proxyUrl: ${?PROXY_URL} - -grafanaUrl: ${proxyUrl}"/grafana" -grafanaUrl: ${?GRAFANA_URL} - -#TODO: Figure out the defaults. It's tricky part, because influxUrl and grafanaUrl can point to same location, but one link is used -#from browser and the other - from UI backend and in docker/nginx setups they *will* be different... -countsUrl: ${grafanaUrl}"/api/datasources/proxy/1/query" -countsUrl: ${?COUNTS_URL} - -countsSettings { - user: "admin" - password: "admin" - influxUrl: ${countsUrl} - database: "esp" -} - -# Base streaming configuration -flinkRestUrl: "http://jobmanager:8081" -flinkRestUrl: ${?FLINK_REST_URL} - -flinkQueryableStateProxyUrl: "taskmanager:9069" -flinkQueryableStateProxyUrl: ${?FLINK_QUERYABLE_STATE_PROXY_URL} - -flinkEngineConfig { - jobManagerTimeout: 1m - type: "flinkStreaming" - restUrl: ${flinkRestUrl} - queryableStateProxyUrl: ${flinkQueryableStateProxyUrl} -} diff --git a/nussknacker-dist/src/universal/conf/dev-application.conf b/nussknacker-dist/src/universal/conf/dev-application.conf index 834d58a725e..afc60b54d04 100644 --- a/nussknacker-dist/src/universal/conf/dev-application.conf +++ b/nussknacker-dist/src/universal/conf/dev-application.conf @@ -1,10 +1,8 @@ +# This config contains sample configuration that allows for easier experiments with more advanced Nussknacker features +# In particular, it can be used during development and/or Nussknacker testing +# To run it, one must use Nussknacker distribution which contains managementSample and standaloneSample (e.g. staging-latest docker images) # This config is exposed in development dist and locally in development (se ui/server/runServer.sh) - -base: { include "base-application.conf" } - -//TODO dsw change this to local -//to be consistent with docker/demo -environment: "demo" +environment: "local" categoriesConfig: { "Default": "streaming-generic" @@ -24,9 +22,17 @@ standaloneModelDir: ${?STANDALONE_MODEL_DIR} standaloneManagmentUrl: "http://localhost:8070" standaloneManagmentUrl: ${?STANDALONE_MANAGMENT_URL} +flinkEngineConfig { + type: "flinkStreaming" + restUrl: "http://jobmanager:8081" + restUrl: ${?FLINK_REST_URL} + queryableStateProxyUrl: "taskmanager:9069" + queryableStateProxyUrl: ${?FLINK_QUERYABLE_STATE_PROXY_URL} +} + processTypes { "streaming": { - engineConfig: ${base.flinkEngineConfig} + engineConfig: ${flinkEngineConfig} modelConfig: { classPath: [ ${managementModelDir}"/managementSample.jar" ] rocksDB: { @@ -35,7 +41,7 @@ processTypes { } } "streaming-generic": { - engineConfig: ${base.flinkEngineConfig} + engineConfig: ${flinkEngineConfig} modelConfig: { classPath: [ ${genericModelDir}"/genericModel.jar" ] rocksDB: { @@ -54,8 +60,11 @@ processTypes { } } +grafanaUrl: "/grafana" +grafanaUrl: ${?GRAFANA_URL} + metricsSettings { - url: ${base.grafanaUrl}"/d/$dashboard?theme=dark&var-processName=$process&var-env="${environment} + url: ${grafanaUrl}"/d/$dashboard?theme=dark&var-processName=$process&var-env="${environment} defaultDashboard: "nussknacker-scenario" processingTypeToDashboard: { #Note: currently in demo docker-compose these dashboards do not exist. We keep them here, so @@ -70,7 +79,7 @@ environmentAlert: { } customTabs = [ - {name: "Metrics", url: ${base.grafanaUrl}"/dashboard/db/"${metricsSettings.defaultDashboard}"?theme=dark&var-env="${environment}, id: "metrics"}, + {name: "Metrics", url: ${grafanaUrl}"/dashboard/db/"${metricsSettings.defaultDashboard}"?theme=dark&var-env="${environment}, id: "metrics"}, ] secondaryEnvironmentUri: "http://localhost:8080/api" @@ -86,15 +95,21 @@ secondaryEnvironment { } } -standaloneEngineProcessLocation: ${base.storageDir}"/standaloneProcesses" +standaloneEngineProcessLocation: ${STORAGE_DIR}"/standaloneProcesses" customProcesses: { "customProcess1": "pl.touk.custom.NonExistingCustomProcess" } -# TODO: lightbend config can't include files on root level - move nussknacker config on nk level and get rid of this below -db: ${base.db} +commentSettings: { + matchExpression: "(issues/[0-9]*)" + link: "https://github.com/TouK/nussknacker/$1" +} + +countsSettings { + influxUrl: "http://influxdb:8086/query" + influxUrl: ${?COUNTS_URL} + database: "esp" +} -commentSettings: ${base.commentSettings} -attachmentsPath: ${base.attachmentsPath} -countsSettings: ${base.countsSettings} +developmentMode: ${?DEVELOPMENT_MODE} diff --git a/ui/server/src/main/resources/defaultUiConfig.conf b/ui/server/src/main/resources/defaultUiConfig.conf index c676dc53ecf..cd7f98aa215 100644 --- a/ui/server/src/main/resources/defaultUiConfig.conf +++ b/ui/server/src/main/resources/defaultUiConfig.conf @@ -1,12 +1,21 @@ -#We use defaultUConfig.conf instead of reference.conf, as we don't want these properties in config loaded in -# +#We use defaultUConfig.conf instead of reference.conf, as we don't want these properties in config loaded in model configuration +#This configuration file contains sensible designer defaults for all Nussknacker deployments, without assumptions about deployment models and external tools (grafana, flink etc.) +#All models configurations also shouldn't be in this file + +storageDir: ./storage +storageDir: ${?STORAGE_DIR} db { - url: "jdbc:hsqldb:file:data/db;sql.syntax_ora=true" + url: "jdbc:hsqldb:file:"${storageDir}"/db;sql.syntax_ora=true" + url: ${?DB_URL} driver: "org.hsqldb.jdbc.JDBCDriver" + driver: ${?DB_DRIVER} user: "SA" + user: ${?DB_USER} password: "" + password: ${?DB_PASSWORD} connectionTimeout: 30000 + connectionTimeout: ${?DB_CONNECTION_TIMEOUT} #we use low values here, as NK UI is not very data-intensive maximumPoolSize: 5 minimumIdle: 1 @@ -14,6 +23,8 @@ db { numThreads: 5 } +attachmentsPath: ${storageDir}"/attachments" + http { port: 8080 interface: "0.0.0.0" @@ -34,17 +45,6 @@ akka { } } -processConfig { - checkpointConfig { - checkpointInterval: 10s - } - timeout: 10s - asyncExecutionConfig { - bufferSize: 200 - workers: 8 - } -} - intervalTimeSettings: { processes: 20000 healthCheck: 30000