Skip to content

Commit

Permalink
Review
Browse files Browse the repository at this point in the history
  • Loading branch information
Damian Święcki committed Jul 16, 2021
1 parent b1b246a commit bc61e52
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 39 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -955,10 +955,10 @@ lazy val sql = (project in component("sql")).
name := "nussknacker-sql",
libraryDependencies ++= Seq(
"com.zaxxer" % "HikariCP" % "4.0.3",
"org.apache.flink" %% "flink-streaming-scala" % flinkV % "provided,optional",
"org.apache.flink" %% "flink-streaming-scala" % flinkV % Provided,
"org.scalatest" %% "scalatest" % scalaTestV % "it,test"
),
).dependsOn(api, process % "provided,optional", engineStandalone % "provided,optional", standaloneUtil % "provided,optional", httpUtils % Provided, flinkTestUtil % "it,test", kafkaTestUtil % "it,test")
).dependsOn(api % Provided, process % Provided, engineStandalone % Provided, standaloneUtil % Provided, httpUtils % Provided, flinkTestUtil % "it,test", kafkaTestUtil % "it,test")

lazy val buildUi = taskKey[Unit]("builds ui")

Expand Down
8 changes: 8 additions & 0 deletions demo/docker/docker-compose-env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ services:
volumes:
- ./flink/flink-conf.yaml:/tmp/flink-conf.yaml
- ./flink/flink-entrypoint.sh:/flink-entrypoint.sh
- ./flink/postgresql-42.2.19.jar:/opt/flink/lib/postgresql-42.2.19.jar
- nussknacker_storage_flink:/opt/flink/data

taskmanager:
Expand All @@ -117,6 +118,7 @@ services:
- ./flink/flink-conf.yaml:/tmp/flink-conf.yaml
- ./flink/flink-entrypoint.sh:/flink-entrypoint.sh
- nussknacker_storage_flink:/opt/flink/data
- ./flink/postgresql-42.2.19.jar:/opt/flink/lib/postgresql-42.2.19.jar
ulimits:
nproc: 70000
nofile:
Expand Down Expand Up @@ -154,6 +156,12 @@ services:
customerservice:
container_name: nussknacker_customerservice
build: customerservice
postgres:
container_name: nussknacker_postgres
hostname: nussknacker_postgres
image: ghusta/postgres-world-db
ports:
- "5432:5432"

volumes:
nussknacker_storage_zookeeper_datalog:
Expand Down
25 changes: 14 additions & 11 deletions demo/docker/nussknacker/nussknacker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
#Here we configure OpenAPI based enricher, which is implemented by python service in customerservice
{
sqlEnricherDbPool {
driverClassName: "org.hsqldb.jdbc.JDBCDriver"
url: "jdbc:hsqldb:file:/opt/nussknacker/storage/db;sql.syntax_ora=true"
username: "sa"
password: ""
driverClassName: "org.postgresql.Driver"
url: "jdbc:postgresql://nussknacker_postgres:5432/world-db"
username: "world"
password: "world123"
}

processTypes.streaming.modelConfig {
#We add additional jar to model classPath
classPath += "components/openapi.jar" += "components/sql.jar"
classPath += "components/openapi.jar"
classPath += "components/sql.jar"
components.openAPI {
url: "http://customerservice:5000/swagger"
rootUrl: "http://customerservice:5000"
Expand All @@ -20,12 +21,14 @@
components.databaseEnricher {
categories: ["Default"]
config: {
databaseQueryEnrichers: [
{ name: "db-query", dbPool: ${sqlEnricherDbPool} }
]
databaseLookupEnrichers: [
{ name: "db-lookup", dbPool: ${sqlEnricherDbPool} }
]
databaseQueryEnricher {
name: "db-query"
dbPool: ${sqlEnricherDbPool}
}
databaseLookupEnricher {
name: "db-lookup"
dbPool: ${sqlEnricherDbPool}
}
}
}
}
Expand Down
27 changes: 15 additions & 12 deletions docs/components/Sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ Nussknacker `Sql` enricher can connect to SQL databases with HikariCP JDBC conne
It supports:

- real time database lookup - a simplified mode where you can select from table filtering for a specified key.
- generic DDM/DDL query enricher
- both `databaseQueryEnricher` as well as `databaseLookupEnricher` can cache queries results
- you can specify cache TTL (Time To Live) duration via `Cache TTL` property
- for `databaseQueryEnricher` you can specify `Result Strategy`
Expand Down Expand Up @@ -42,28 +41,32 @@ myDatabasePool {
| maxTotal | false | 10 | Maximum pool size |
| initialSize | false | 0 | Minimum idle size |

> As a user you have to provide the database driver. It should be placed in flink /lib folder, more info can be found in [Flink Documentation](https://ci.apache.org/projects/flink/flink-docs-stable/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code)
Next you have to configure component itself.

You can have multiple databaseQueryEnrichers and databaseLookupEnrichers for multiple various database connections. You
You can have multiple components for multiple various database connections. You
can also specify only one of them.

```
components {
yourUniqueComponentName: {
type: databaseEnricher #this defines your component type
config: {
databaseQueryEnrichers: [
{ name: "myDatabaseQuery", dbPool: ${myDatabasePool} }
]
databaseLookupEnrichers: [
{ name: "myDatabaseLookup", dbPool: ${myDatabasePool} }
]
databaseQueryEnricher {
name: "myDatabaseQuery"
dbPool: ${myDatabasePool}
}
databaseLookupEnricher {
name: "myDatabaseLookup"
dbPool: ${myDatabasePool}
}
}
}
}
```

| Parameter | Required | Default | Description |
| ---------- | -------- | ------- | ----------- |
| databaseQueryEnrichers | true | | List of database query enrichers components |
| databaseLookupEnrichers | true | | List of database lookup components |
| Parameter | Required | Default | Description |
| ---------- | -------- | ------- | ----------- |
| databaseQueryEnricher | true | | Database query enricher component |
| databaseLookupEnricher | true | | Database lookup component |
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,11 @@ class DatabaseEnricherComponentProvider extends ComponentProvider {

override def create(config: Config, dependencies: ProcessObjectDependencies): List[ComponentDefinition] = {
val componentConfig = config.getConfig("config")
val dbQueryEnrichers = componentConfig.as[List[DbEnricherConfig]]("databaseQueryEnrichers").map { dbEnricherConfig =>
ComponentDefinition(
name = dbEnricherConfig.name,
component = new DatabaseQueryEnricher(dbEnricherConfig.dbPool))
}
val dbLookupEnrichers = componentConfig.as[List[DbEnricherConfig]]("databaseLookupEnrichers").map { dbEnricherConfig =>
ComponentDefinition(
name = dbEnricherConfig.name,
component = new DatabaseLookupEnricher(dbEnricherConfig.dbPool))
}
dbQueryEnrichers ++ dbLookupEnrichers
val queryConfig = componentConfig.as[DbEnricherConfig]("databaseQueryEnricher")
val lookupConfig = componentConfig.as[DbEnricherConfig]("databaseLookupEnricher")
val dbQueryEnrichers = ComponentDefinition(name = queryConfig.name, component = new DatabaseQueryEnricher(queryConfig.dbPool))
val dbLookupEnrichers = ComponentDefinition(name = lookupConfig.name, component = new DatabaseLookupEnricher(lookupConfig.dbPool))
List(dbQueryEnrichers, dbLookupEnrichers)
}

override def isCompatible(version: NussknackerVersion): Boolean = true
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.sql.service

import pl.touk.nussknacker.engine.api.process.RunMode
import pl.touk.nussknacker.engine.api.test.InvocationCollectors.ServiceInvocationCollector
import pl.touk.nussknacker.engine.api.typed.typing
import pl.touk.nussknacker.engine.api.{ContextId, ServiceInvoker}
Expand All @@ -25,7 +26,7 @@ class DatabaseEnricherInvoker(query: String,
}

override def invokeService(params: Map[String, Any])
(implicit ec: ExecutionContext, collector: ServiceInvocationCollector, contextId: ContextId): Future[queryExecutor.QueryResult] =
(implicit ec: ExecutionContext, collector: ServiceInvocationCollector, contextId: ContextId, runMode: RunMode): Future[queryExecutor.QueryResult] =
Future.successful {
queryDatabase(queryArgumentsExtractor(argsCount, params))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pl.touk.nussknacker.sql.service

import com.github.benmanes.caffeine.cache.Caffeine
import pl.touk.nussknacker.engine.api.ContextId
import pl.touk.nussknacker.engine.api.process.RunMode
import pl.touk.nussknacker.engine.api.test.InvocationCollectors.ServiceInvocationCollector
import pl.touk.nussknacker.engine.api.typed.typing
import pl.touk.nussknacker.sql.db.query.{QueryArguments, QueryArgumentsExtractor, QueryResultStrategy}
Expand Down Expand Up @@ -33,7 +34,7 @@ class DatabaseEnricherInvokerWithCache(query: String,
.build[CacheKey, CacheEntry[queryExecutor.QueryResult]]

override def invokeService(params: Map[String, Any])
(implicit ec: ExecutionContext, collector: ServiceInvocationCollector, contextId: ContextId): Future[queryExecutor.QueryResult] = {
(implicit ec: ExecutionContext, collector: ServiceInvocationCollector, contextId: ContextId, runMode: RunMode): Future[queryExecutor.QueryResult] = {
val queryArguments = queryArgumentsExtractor(argsCount, params)
val cacheKey = CacheKey(query, queryArguments)
val result = Option(cache.getIfPresent(cacheKey)).map(_.value).getOrElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pl.touk.nussknacker.sql.utils
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
import pl.touk.nussknacker.engine.api._
import pl.touk.nussknacker.engine.api.deployment.DeploymentData
import pl.touk.nussknacker.engine.api.process.RunMode
import pl.touk.nussknacker.engine.api.test.EmptyInvocationCollector
import pl.touk.nussknacker.engine.api.test.InvocationCollectors.ServiceInvocationCollector

Expand All @@ -14,6 +15,7 @@ trait BaseDatabaseQueryEnricherTest extends FunSuite with Matchers with BeforeAn
implicit val contextId: ContextId = ContextId("")
implicit val metaData: MetaData = MetaData("", StreamMetaData())
implicit val collector: ServiceInvocationCollector = EmptyInvocationCollector.Instance
implicit val runMode: RunMode = RunMode.Test

val jobData: JobData = JobData(MetaData("", StreamMetaData()), ProcessVersion.empty, DeploymentData.empty)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pl.touk.nussknacker.sql.utils
import org.scalatest.Matchers
import org.scalatest.concurrent.ScalaFutures
import pl.touk.nussknacker.engine.api.deployment.DeploymentData
import pl.touk.nussknacker.engine.api.process.RunMode
import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion}
import pl.touk.nussknacker.engine.graph.EspProcess
import pl.touk.nussknacker.engine.resultcollector.ProductionServiceInvocationCollector
Expand All @@ -14,6 +15,8 @@ import pl.touk.nussknacker.engine.util.SynchronousExecutionContext.ctx

trait StandaloneProcessTest extends Matchers with ScalaFutures {

val runMode: RunMode = RunMode.Test

def modelData: LocalModelData

def contextPreparer: StandaloneContextPreparer
Expand All @@ -29,7 +32,7 @@ trait StandaloneProcessTest extends Matchers with ScalaFutures {
}

private def prepareInterpreter(process: EspProcess): StandaloneProcessInterpreter = {
val validatedInterpreter = StandaloneProcessInterpreter(process, contextPreparer, modelData, Nil, ProductionServiceInvocationCollector)
val validatedInterpreter = StandaloneProcessInterpreter(process, contextPreparer, modelData, Nil, ProductionServiceInvocationCollector, runMode)

validatedInterpreter shouldBe 'valid
validatedInterpreter.toEither.right.get
Expand Down

0 comments on commit bc61e52

Please sign in to comment.