Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sql import #1857

Merged
merged 7 commits into from
Jul 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -341,13 +341,15 @@ lazy val dist = {
(assembly in Compile) in generic,
(assembly in Compile) in flinkProcessManager,
(assembly in Compile) in engineStandalone,
(assembly in Compile) in openapi
(assembly in Compile) in openapi,
(assembly in Compile) in sql,
).value,
mappings in Universal ++= Seq(
(crossTarget in generic).value / "genericModel.jar" -> "model/genericModel.jar",
(crossTarget in flinkProcessManager).value / "nussknacker-flink-manager.jar" -> "managers/nussknacker-flink-manager.jar",
(crossTarget in engineStandalone).value / "nussknacker-standalone-manager.jar" -> "managers/nussknacker-standalone-manager.jar",
(crossTarget in openapi).value / "openapi.jar" -> "components/openapi.jar"
(crossTarget in openapi).value / "openapi.jar" -> "components/openapi.jar",
(crossTarget in sql).value / "sql.jar" -> "components/sql.jar"
),
/* //FIXME: figure out how to filter out only for .tgz, not for docker
mappings in Universal := {
Expand Down Expand Up @@ -387,7 +389,6 @@ def engine(name: String) = file(s"engine/$name")

def component(name: String) = file(s"engine/components/$name")


lazy val engineStandalone = (project in engine("standalone/engine")).
configs(IntegrationTest).
settings(commonSettings).
Expand Down Expand Up @@ -943,6 +944,22 @@ lazy val openapi = (project in component("openapi")).
),
).dependsOn(api % Provided, process % Provided, engineStandalone % Provided, standaloneUtil % Provided, httpUtils % Provided, flinkTestUtil % "it,test", kafkaTestUtil % "it,test")

lazy val sql = (project in component("sql")).
configs(IntegrationTest).
settings(commonSettings).
settings(Defaults.itSettings).
settings(commonSettings).
settings(assemblySampleSettings("sql.jar"): _*).
settings(publishAssemblySettings: _*).
settings(
name := "nussknacker-sql",
libraryDependencies ++= Seq(
"com.zaxxer" % "HikariCP" % "4.0.3",
"org.apache.flink" %% "flink-streaming-scala" % flinkV % Provided,
"org.scalatest" %% "scalatest" % scalaTestV % "it,test"
),
).dependsOn(api % Provided, process % Provided, engineStandalone % Provided, standaloneUtil % Provided, httpUtils % Provided, flinkTestUtil % "it,test", kafkaTestUtil % "it,test")

lazy val buildUi = taskKey[Unit]("builds ui")

def runNpm(command: String, errorMessage: String, outputPath: File): Unit = {
Expand Down Expand Up @@ -1086,7 +1103,7 @@ lazy val modules = List[ProjectReference](
engineStandalone, standaloneApp, flinkProcessManager, flinkPeriodicProcessManager, standaloneSample, flinkManagementSample, managementJavaSample, generic,
openapi, process, interpreter, benchmarks, kafka, avroFlinkUtil, kafkaFlinkUtil, kafkaTestUtil, util, testUtil, flinkUtil, flinkModelUtil,
flinkTestUtil, standaloneUtil, standaloneApi, api, security, flinkApi, processReports, httpUtils, queryableState,
restmodel, listenerApi, ui
restmodel, listenerApi, ui, sql
)
lazy val modulesWithBom: List[ProjectReference] = bom :: modules

Expand Down
10 changes: 10 additions & 0 deletions demo/docker/docker-compose-env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ services:
volumes:
- ./flink/flink-conf.yaml:/tmp/flink-conf.yaml
- ./flink/flink-entrypoint.sh:/flink-entrypoint.sh
# can be removed unless you use database enricher
- ./flink/postgresql-42.2.19.jar:/opt/flink/lib/postgresql-42.2.19.jar
- nussknacker_storage_flink:/opt/flink/data

taskmanager:
Expand All @@ -117,6 +119,8 @@ services:
- ./flink/flink-conf.yaml:/tmp/flink-conf.yaml
- ./flink/flink-entrypoint.sh:/flink-entrypoint.sh
- nussknacker_storage_flink:/opt/flink/data
# can be removed unless you use database enricher
- ./flink/postgresql-42.2.19.jar:/opt/flink/lib/postgresql-42.2.19.jar
ulimits:
nproc: 70000
nofile:
Expand Down Expand Up @@ -154,6 +158,12 @@ services:
customerservice:
container_name: nussknacker_customerservice
build: customerservice
postgres:
container_name: nussknacker_postgres
hostname: nussknacker_postgres
image: ghusta/postgres-world-db
ports:
- "5432:5432"

volumes:
nussknacker_storage_zookeeper_datalog:
Expand Down
Binary file added demo/docker/flink/postgresql-42.2.19.jar
Binary file not shown.
22 changes: 22 additions & 0 deletions demo/docker/nussknacker/nussknacker.conf
Original file line number Diff line number Diff line change
@@ -1,13 +1,35 @@
#This configuration auguments and overrides configuration in docker image
#Here we configure OpenAPI based enricher, which is implemented by python service in customerservice
{
sqlEnricherDbPool {
driverClassName: "org.postgresql.Driver"
url: "jdbc:postgresql://nussknacker_postgres:5432/world-db"
username: "world"
password: "world123"
}

scenarioTypes.streaming.modelConfig {
#We add additional jar to model classPath
classPath += "components/openapi.jar"
classPath += "components/sql.jar"
components.openAPI {
url: "http://customerservice:5000/swagger"
rootUrl: "http://customerservice:5000"
categories: ["Default"]
}

components.databaseEnricher {
categories: ["Default"]
config: {
databaseQueryEnricher {
name: "db-query"
dbPool: ${sqlEnricherDbPool}
}
databaseLookupEnricher {
name: "db-lookup"
dbPool: ${sqlEnricherDbPool}
}
}
}
}
}
157 changes: 157 additions & 0 deletions docs/components/Sql.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
Overview
========

Nussknacker `Sql` enricher can connect to SQL databases with HikariCP JDBC connection pool.

It supports:

- real time database lookup - a simplified mode where you can select from table filtering for a specified key.
- both `databaseQueryEnricher` as well as `databaseLookupEnricher` can cache queries results
- you can specify cache TTL (Time To Live) duration via `Cache TTL` property
- for `databaseQueryEnricher` you can specify `Result Strategy`
- `Result set` - for retrieving whole query result set
- `Single result` for retrieving single value

Configuration
=============

Sample configuration:

You have to configure database connection pool you will be using in your sql enricher

```
myDatabasePool {
driverClassName: ${dbDriver}
url: ${myDatabaseUrl}
username: ${myDatabaseUser}
password: ${myDatabasePassword}
timeout: ${dbConnectionTimeout}
initialSize: ${dbInitialPoolSize}
maxTotal: ${dbMaxPoolSize}
}
```

| Parameter | Required | Default | Description |
| ---------- | -------- | ------- | ----------- |
| url | true | | URL with your database resource |
| username | true | | Authentication username |
| password | true | | Authentication password |
| driverClassName | true | | Database driver class name |
| timeout | false | 30s | Connection timeout |
| maxTotal | false | 10 | Maximum pool size |
| initialSize | false | 0 | Minimum idle size |

> As a user you have to provide the database driver.
> It should be placed in flink /lib folder (/opt/flink/lib), more info can be found in [Flink Documentation](https://ci.apache.org/projects/flink/flink-docs-stable/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code).
> Additionally it should be placed in nussknacker /lib folder (/opt/nussknacker/lib)

Next you have to configure component itself.

You can have multiple components for multiple various database connections. You can also specify only one of them.

```
components {
yourUniqueComponentName: {
type: databaseEnricher #this defines your component type
config: {
databaseQueryEnricher {
name: "myDatabaseQuery"
dbPool: ${myDatabasePool}
}
databaseLookupEnricher {
name: "myDatabaseLookup"
dbPool: ${myDatabasePool}
}
}
}
}
```

| Parameter | Required | Default | Description |
| ---------- | -------- | ------- | ----------- |
| databaseQueryEnricher | true | | Database query enricher component |
| databaseLookupEnricher | true | | Database lookup component |

### Handling typical errors

The most common problems are:

##### Problem: Database driver is missing in taskManager
```
java.lang.RuntimeException: Failed to load driver class org.postgresql.Driver in either of HikariConfig class loader or Thread context classloader
at com.zaxxer.hikari.HikariConfig.setDriverClassName(HikariConfig.java:491)
at pl.touk.nussknacker.sql.db.pool.HikariDataSourceFactory$.apply(HikariDataSourceFactory.scala:15)
at pl.touk.nussknacker.sql.service.DatabaseQueryEnricher.open(DatabaseQueryEnricher.scala:87)
at pl.touk.nussknacker.engine.process.compiler.FlinkProcessCompilerData.$anonfun$open$1(FlinkProcessCompilerData.scala:42)
at pl.touk.nussknacker.engine.process.compiler.FlinkProcessCompilerData.$anonfun$open$1$adapted(FlinkProcessCompilerData.scala:42)
at scala.collection.immutable.List.foreach(List.scala:388)
at pl.touk.nussknacker.engine.process.compiler.FlinkProcessCompilerData.open(FlinkProcessCompilerData.scala:42)
at pl.touk.nussknacker.engine.process.ProcessPartFunction.open(ProcessPartFunction.scala:27)
at pl.touk.nussknacker.engine.process.ProcessPartFunction.open$(ProcessPartFunction.scala:25)
at pl.touk.nussknacker.engine.process.registrar.AsyncInterpretationFunction.open(AsyncInterpretationFunction.scala:36)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:154)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Unknown Source)
```

##### Solution:

Add appropriate database driver to your `/opt/flink/lib` directory

##### Problem: Database driver is missing in jobManager
```
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Compilation errors: CannotCreateObjectError(No suitable driver found for jdbc:postgresql://nussknacker_postgres:5432/world-db,db-query), CannotCreateObjectError(No suitable driver found for jdbc:postgresql://nussknacker_postgres:5432/world-db,db-query), ExpressionParseError(Unresolved reference 'output',variable,Some($expression),#output.^[name == #input.clientId])
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.IllegalArgumentException: Compilation errors: CannotCreateObjectError(No suitable driver found for jdbc:postgresql://nussknacker_postgres:5432/world-db,db-query), CannotCreateObjectError(No suitable driver found for jdbc:postgresql://nussknacker_postgres:5432/world-db,db-query), ExpressionParseError(Unresolved reference 'output',variable,Some($expression),#output.^[name == #input.clientId])
at pl.touk.nussknacker.engine.process.compiler.FlinkProcessCompilerData.validateOrFail(FlinkProcessCompilerData.scala:59)
at pl.touk.nussknacker.engine.process.compiler.FlinkProcessCompilerData.compileProcess(FlinkProcessCompilerData.scala:68)
at pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar.register(FlinkProcessRegistrar.scala:136)
at pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar.$anonfun$register$1(FlinkProcessRegistrar.scala:61)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at pl.touk.nussknacker.engine.util.ThreadUtils$.withThisAsContextClassLoader(ThreadUtils.scala:11)
at pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar.usingRightClassloader(FlinkProcessRegistrar.scala:71)
at pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar.register(FlinkProcessRegistrar.scala:50)
at pl.touk.nussknacker.engine.process.runner.FlinkStreamingProcessMain$.runProcess(FlinkStreamingProcessMain.scala:28)
at pl.touk.nussknacker.engine.process.runner.FlinkStreamingProcessMain$.runProcess(FlinkStreamingProcessMain.scala:14)
at pl.touk.nussknacker.engine.process.runner.FlinkProcessMain.main(FlinkProcessMain.scala:29)
at pl.touk.nussknacker.engine.process.runner.FlinkProcessMain.main$(FlinkProcessMain.scala:18)
at pl.touk.nussknacker.engine.process.runner.FlinkStreamingProcessMain$.main(FlinkStreamingProcessMain.scala:14)
at pl.touk.nussknacker.engine.process.runner.FlinkStreamingProcessMain.main(FlinkStreamingProcessMain.scala)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 12 more
```
##### Solution:

Add appropriate database driver to your `/opt/flink/lib` directory

##### Problem: Database driver is missing in nussknacker application - designer
```
Could not create db-query: No suitable driver found for jdbc:postgresql://nussknacker_postgres:5432/world-db
```
##### Solution:

Add appropriate database driver to your `/opt/nussknacker/lib` directory
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pl.touk.nussknacker.sql.DatabaseEnricherComponentProvider
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package pl.touk.nussknacker.sql

import com.typesafe.config.Config
import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ArbitraryTypeReader._
import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, ComponentProvider, NussknackerVersion}
import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies
import pl.touk.nussknacker.sql.service.{DatabaseLookupEnricher, DatabaseQueryEnricher}

class DatabaseEnricherComponentProvider extends ComponentProvider {

override val providerName: String = "databaseEnricher"

override def resolveConfigForExecution(config: Config): Config = config

override def create(config: Config, dependencies: ProcessObjectDependencies): List[ComponentDefinition] = {
val componentConfig = config.getConfig("config")
val queryConfig = componentConfig.as[DbEnricherConfig]("databaseQueryEnricher")
val lookupConfig = componentConfig.as[DbEnricherConfig]("databaseLookupEnricher")
val dbQueryEnrichers = ComponentDefinition(name = queryConfig.name, component = new DatabaseQueryEnricher(queryConfig.dbPool))
val dbLookupEnrichers = ComponentDefinition(name = lookupConfig.name, component = new DatabaseLookupEnricher(lookupConfig.dbPool))
List(dbQueryEnrichers, dbLookupEnrichers)
}

override def isCompatible(version: NussknackerVersion): Boolean = true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package pl.touk.nussknacker.sql

import pl.touk.nussknacker.sql.db.pool.DBPoolConfig

case class DbEnricherConfig(name: String, dbPool: DBPoolConfig)
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package pl.touk.nussknacker.sql.db

import java.sql.{Connection, PreparedStatement}

trait WithDBConnectionPool {

val getConnection: () => Connection

def withConnection[T](query: String)(f: PreparedStatement => T): T = {
withConnection { conn =>
val statement = conn.prepareStatement(query)
try f(statement) finally statement.close()
}
}

private def withConnection[T](f: Connection => T): T = {
val conn = getConnection()
try f(conn) finally conn.close()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package pl.touk.nussknacker.sql.db.pool

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.{Duration, FiniteDuration}

case class DBPoolConfig(driverClassName: String,
url: String,
username: String,
password: String,
initialSize: Int = 0,
maxTotal: Int = 10,
timeout: Duration = FiniteDuration(30, TimeUnit.SECONDS),
connectionProperties: Map[String, String] = Map.empty)
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package pl.touk.nussknacker.sql.db.pool

import com.typesafe.config.Config

object DBPoolsConfig {

import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ArbitraryTypeReader._

final val configPath: String = "dbPools"

def apply(config: Config): Map[String, DBPoolConfig] = config
.getAs[Map[String, DBPoolConfig]](configPath)
.getOrElse(Map.empty)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package pl.touk.nussknacker.sql.db.pool

import com.zaxxer.hikari.{HikariConfig, HikariDataSource}

object HikariDataSourceFactory {

def apply(conf: DBPoolConfig): HikariDataSource = {
val hikariConf = new HikariConfig()
hikariConf.setJdbcUrl(conf.url)
hikariConf.setUsername(conf.username)
hikariConf.setPassword(conf.password)
hikariConf.setMinimumIdle(conf.initialSize)
hikariConf.setMaximumPoolSize(conf.maxTotal)
hikariConf.setConnectionTimeout(conf.timeout.toMillis)
hikariConf.setDriverClassName(conf.driverClassName)
conf.connectionProperties.foreach { case (name, value) =>
hikariConf.addDataSourceProperty(name, value)
}
new HikariDataSource(hikariConf)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package pl.touk.nussknacker.sql.db.query

case class QueryArguments(value: List[QueryArgument])

case class QueryArgument(index: Int, value: Any)
Loading