Skip to content

Commit

Permalink
[Refactor] Configuration cleanup (#1906)
Browse files Browse the repository at this point in the history
* [Refactor] Configuration cleanup

* review comments
  • Loading branch information
mproch authored Jul 15, 2021
1 parent 6635fb8 commit 596f22f
Show file tree
Hide file tree
Showing 16 changed files with 87 additions and 138 deletions.
1 change: 0 additions & 1 deletion demo/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ services:
#multiple, comma separated, config files can be used. They will be merged in order, via HOCON fallback mechanism
#https://github.com/lightbend/config/blob/master/HOCON.md#config-object-merging-and-file-merging
NUSSKNACKER_CONFIG_FILE: ${NUSSKNACKER_CONFIG_FILE-/opt/nussknacker/conf/application.conf,/opt/nussknacker/conf/nussknacker.conf}
COUNTS_URL: http://influxdb:8086/query
JDK_JAVA_OPTIONS: -Xmx256M
FLINK_ROCKSDB_CHECKPOINT_DATA_URI: file:///opt/flink/data/rocksdb-checkpoints
volumes:
Expand Down
3 changes: 2 additions & 1 deletion demo/docker/nussknacker/nussknacker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
#Here we configure OpenAPI based enricher, which is implemented by python service in customerservice
{
processTypes.streaming.modelConfig {
classPath: ["model/genericModel.jar", "components/openapi.jar"]
#We add additional jar to model classPath
classPath += "components/openapi.jar"
components.openAPI {
url: "http://customerservice:5000/swagger"
rootUrl: "http://customerservice:5000"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
}

timeout: 10s
delayBetweenAttempts: 10s
checkpointConfig {
checkpointInterval: 10m
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package pl.touk.nussknacker.engine.management

import com.typesafe.config.Config

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.DurationInt

case class FlinkConfig(jobManagerTimeout: FiniteDuration,
case class FlinkConfig(restUrl: String,
queryableStateProxyUrl: Option[String],
restUrl: String,
shouldVerifyBeforeDeploy: Option[Boolean])
jobManagerTimeout: FiniteDuration = 1 minute,
shouldVerifyBeforeDeploy: Boolean = true)
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import scala.concurrent.{Await, Future}

class FlinkRestManager(config: FlinkConfig, modelData: ModelData, mainClassName: String)
(implicit backend: SttpBackend[Future, Nothing, NothingT])
extends FlinkProcessManager(modelData, config.shouldVerifyBeforeDeploy.getOrElse(true), mainClassName) with LazyLogging {
extends FlinkProcessManager(modelData, config.shouldVerifyBeforeDeploy, mainClassName) with LazyLogging {

protected lazy val jarFile: File = new FlinkModelJar().buildJobJar(modelData)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package pl.touk.nussknacker.engine.management.rest

import com.typesafe.scalalogging.LazyLogging
import io.circe.Error
import pl.touk.nussknacker.engine.api.CirceUtil
import pl.touk.nussknacker.engine.api.deployment.{ExternalDeploymentId, SavepointResult}
import pl.touk.nussknacker.engine.management.rest.flinkRestModel._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.concurrent.duration._
//TODO move some tests to FlinkHttpClientTest
class FlinkRestManagerSpec extends FunSuite with Matchers with PatientScalaFutures {

private val config = FlinkConfig(10 minute, None, "http://test.pl", None)
private val config = FlinkConfig("http://test.pl", None)

private var statuses: List[JobOverview] = List()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class FlinkHttpClientTest extends FunSuite
private val flinkJarFile = JarFile(jarId, jarFileName)
private val deploymentId = ExternalDeploymentId("someDeploymentId")

val config: FlinkConfig = FlinkConfig(FiniteDuration(10, TimeUnit.SECONDS), None, "http://localhost:12345/flink", None)
val config: FlinkConfig = FlinkConfig("http://localhost:12345/flink", None)

test("uploadJarFileIfNotExists - should upload jar") {
implicit val backend = SttpBackendStub.asynchronousFuture.whenRequestMatchesPartial {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import scala.concurrent.duration._
object propertyAccessors {

def configured(): Seq[PropertyAccessor] = {
//FIXME: configurable timeout...
val lazyValuesTimeout = 1 minute

Seq(
new ReflectivePropertyAccessor(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import com.dimafeng.testcontainers.{ForAllTestContainer, InfluxDBContainer}
import org.asynchttpclient.DefaultAsyncHttpClientConfig
import org.influxdb.InfluxDBFactory
import org.influxdb.dto.Point
import org.scalatest.concurrent.Eventually
import org.scalatest.prop.TableDrivenPropertyChecks
import org.scalatest.{Assertion, FunSuite, Matchers}
import pl.touk.nussknacker.processCounts.{CannotFetchCountsError, ExecutionCount, RangeCount}
Expand Down Expand Up @@ -110,7 +109,7 @@ class InfluxCountsReporterSpec extends FunSuite with ForAllTestContainer with Ta
private val influxDB = InfluxDBFactory.connect(container.url, container.username, container.password)

def reporter(queryMode: QueryMode.Value) = new InfluxCountsReporter(env,
InfluxConfig(container.url + "/query", container.username, container.password, container.database, queryMode, Some(config))
InfluxConfig(container.url + "/query", Option(container.username), Option(container.password), container.database, queryMode, Some(config))
)

influxDB.setDatabase(container.database)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package pl.touk.nussknacker.processCounts.influxdb

case class InfluxConfig(influxUrl: String, user: String, password: String,
case class InfluxConfig(influxUrl: String, user: Option[String], password: Option[String],
database: String,
queryMode: QueryMode.Value = QueryMode.OnlySingleDifference,
metricsConfig: Option[MetricsConfig])
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package pl.touk.nussknacker.processCounts.influxdb

import java.util.concurrent.TimeUnit

import sttp.client._
import sttp.client.circe._
import io.circe.Decoder
import pl.touk.nussknacker.engine.sttp.SttpJson

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.language.implicitConversions

class InfluxException(cause: Throwable) extends Exception(cause)
case class InvalidInfluxResponse(message: String, cause: Throwable) extends InfluxException(cause) {
Expand All @@ -24,8 +24,12 @@ class SimpleInfluxClient(config: InfluxConfig)(implicit backend: SttpBackend[Fut
private val uri = uri"${config.influxUrl}"

def query(query: String)(implicit ec: ExecutionContext): Future[List[InfluxSeries]] = {
basicRequest.get(uri.params("db" -> config.database, "q" -> query))
.auth.basic(config.user, config.password)
def addAuth[T, S](req: Request[T, S]): RequestT[Identity, T, S] = (for {
user <- config.user
password <- config.password
} yield req.auth.basic(user, password)).getOrElse(req)

addAuth(basicRequest.get(uri.params("db" -> config.database, "q" -> query)))
.response(asJson[InfluxResponse])
.send()
.flatMap(SttpJson.failureToFuture[InfluxResponse])
Expand Down
40 changes: 24 additions & 16 deletions nussknacker-dist/src/universal/conf/application.conf
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
base: { include "base-application.conf" }
#This file contains sensible defaults for simple, recommended Nussknacker deployment - with one generic model, deployed on Flink
#We assume integration with Grafana, Influx and Flink as in base docker setup. In other deployments one will probably need to change URLs
#In most cases it should be possible to override parts of this configuration by providing additional config file (see demo/docker/nussknacker/nussknacker.conf)

environment: "local"

categoriesConfig: {
"Default": "streaming"
}

modelClassPath: ["model/genericModel.jar"]
modelClassPath: ${?MODEL_CLASS_PATH}

processTypes {
"streaming": {
engineConfig: ${base.flinkEngineConfig}
engineConfig: ${flinkEngineConfig}
modelConfig: {
classPath: ${modelClassPath}
classPath: ["model/genericModel.jar"]
classPath: ${?MODEL_CLASS_PATH}
rocksDB: {
checkpointDataUri: ${?FLINK_ROCKSDB_CHECKPOINT_DATA_URI}
}
Expand All @@ -22,17 +22,25 @@ processTypes {
}
}

flinkEngineConfig {
jobManagerTimeout: 1m
type: "flinkStreaming"
restUrl: "http://jobmanager:8081"
restUrl: ${?FLINK_REST_URL}
queryableStateProxyUrl: "taskmanager:9069"
queryableStateProxyUrl: ${?FLINK_QUERYABLE_STATE_PROXY_URL}
}

grafanaUrl: "/grafana"
grafanaUrl: ${?GRAFANA_URL}

metricsSettings: {
url: ${base.grafanaUrl}"/d/$dashboard?theme=dark&var-processName=$process&var-env="${environment}
url: ${grafanaUrl}"/d/$dashboard?theme=dark&var-processName=$process&var-env="${environment}
defaultDashboard: "nussknacker-scenario"
processingTypeToDashboard: {
"streaming": "nussknacker-scenario"
}
}

# TODO: lightbend config can't include files on root level - move nussknacker config on nk level and get rid of this below
db: ${base.db}

commentSettings: ${base.commentSettings}
attachmentsPath: ${base.attachmentsPath}
countsSettings: ${base.countsSettings}
countsSettings {
influxUrl: "http://influxdb:8086/query"
influxUrl: ${?COUNTS_URL}
database: "esp"
}
72 changes: 0 additions & 72 deletions nussknacker-dist/src/universal/conf/base-application.conf

This file was deleted.

47 changes: 31 additions & 16 deletions nussknacker-dist/src/universal/conf/dev-application.conf
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
# This config contains sample configuration that allows for easier experiments with more advanced Nussknacker features
# In particular, it can be used during development and/or Nussknacker testing
# To run it, one must use Nussknacker distribution which contains managementSample and standaloneSample (e.g. staging-latest docker images)
# This config is exposed in development dist and locally in development (se ui/server/runServer.sh)

base: { include "base-application.conf" }

//TODO dsw change this to local
//to be consistent with docker/demo
environment: "demo"
environment: "local"

categoriesConfig: {
"Default": "streaming-generic"
Expand All @@ -24,9 +22,17 @@ standaloneModelDir: ${?STANDALONE_MODEL_DIR}
standaloneManagmentUrl: "http://localhost:8070"
standaloneManagmentUrl: ${?STANDALONE_MANAGMENT_URL}

flinkEngineConfig {
type: "flinkStreaming"
restUrl: "http://jobmanager:8081"
restUrl: ${?FLINK_REST_URL}
queryableStateProxyUrl: "taskmanager:9069"
queryableStateProxyUrl: ${?FLINK_QUERYABLE_STATE_PROXY_URL}
}

processTypes {
"streaming": {
engineConfig: ${base.flinkEngineConfig}
engineConfig: ${flinkEngineConfig}
modelConfig: {
classPath: [ ${managementModelDir}"/managementSample.jar" ]
rocksDB: {
Expand All @@ -35,7 +41,7 @@ processTypes {
}
}
"streaming-generic": {
engineConfig: ${base.flinkEngineConfig}
engineConfig: ${flinkEngineConfig}
modelConfig: {
classPath: [ ${genericModelDir}"/genericModel.jar" ]
rocksDB: {
Expand All @@ -54,8 +60,11 @@ processTypes {
}
}

grafanaUrl: "/grafana"
grafanaUrl: ${?GRAFANA_URL}

metricsSettings {
url: ${base.grafanaUrl}"/d/$dashboard?theme=dark&var-processName=$process&var-env="${environment}
url: ${grafanaUrl}"/d/$dashboard?theme=dark&var-processName=$process&var-env="${environment}
defaultDashboard: "nussknacker-scenario"
processingTypeToDashboard: {
#Note: currently in demo docker-compose these dashboards do not exist. We keep them here, so
Expand All @@ -70,7 +79,7 @@ environmentAlert: {
}

customTabs = [
{name: "Metrics", url: ${base.grafanaUrl}"/dashboard/db/"${metricsSettings.defaultDashboard}"?theme=dark&var-env="${environment}, id: "metrics"},
{name: "Metrics", url: ${grafanaUrl}"/dashboard/db/"${metricsSettings.defaultDashboard}"?theme=dark&var-env="${environment}, id: "metrics"},
]

secondaryEnvironmentUri: "http://localhost:8080/api"
Expand All @@ -86,15 +95,21 @@ secondaryEnvironment {
}
}

standaloneEngineProcessLocation: ${base.storageDir}"/standaloneProcesses"
standaloneEngineProcessLocation: ${STORAGE_DIR}"/standaloneProcesses"

customProcesses: {
"customProcess1": "pl.touk.custom.NonExistingCustomProcess"
}

# TODO: lightbend config can't include files on root level - move nussknacker config on nk level and get rid of this below
db: ${base.db}
commentSettings: {
matchExpression: "(issues/[0-9]*)"
link: "https://github.com/TouK/nussknacker/$1"
}

countsSettings {
influxUrl: "http://influxdb:8086/query"
influxUrl: ${?COUNTS_URL}
database: "esp"
}

commentSettings: ${base.commentSettings}
attachmentsPath: ${base.attachmentsPath}
countsSettings: ${base.countsSettings}
developmentMode: ${?DEVELOPMENT_MODE}
Loading

0 comments on commit 596f22f

Please sign in to comment.