diff --git a/build.sbt b/build.sbt index 22e853b0bd2..bc814de10a0 100644 --- a/build.sbt +++ b/build.sbt @@ -12,12 +12,11 @@ assemblyMergeStrategy in assembly := { case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last case PathList("com", "codahale", xs @ _*) => MergeStrategy.last case PathList("com", "yammer", xs @ _*) => MergeStrategy.last - case "about.html" => MergeStrategy.rename - case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last - case "META-INF/mailcap" => MergeStrategy.last - case "META-INF/mimetypes.default" => MergeStrategy.last case PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first + case PathList(ps @ _*) if ps.last endsWith ".xml" => MergeStrategy.first case PathList(ps @ _*) if ps.last endsWith ".class" => MergeStrategy.first + case PathList(ps @ _*) if ps.last endsWith ".thrift" => MergeStrategy.first + case "UnusedStubClass.class" => MergeStrategy.last case x => val oldStrategy = (assemblyMergeStrategy in assembly).value oldStrategy(x) diff --git a/docs/zh-cn/configuration/_sidebar.md b/docs/zh-cn/configuration/_sidebar.md index af190591646..738b4defc0e 100644 --- a/docs/zh-cn/configuration/_sidebar.md +++ b/docs/zh-cn/configuration/_sidebar.md @@ -15,6 +15,8 @@ - [Hdfs](/zh-cn/configuration/input-plugins/Hdfs) - [HdfsStream](/zh-cn/configuration/input-plugins/HdfsStream) - [KafkaStream](/zh-cn/configuration/input-plugins/KafkaStream) + - [Kudu](/zh-cn/configuration/input-plugins/Kudu) + - [MongoDB](/zh-cn/configuration/input-plugins/MongoDB) - [S3Stream](/zh-cn/configuration/input-plugins/S3Stream) - [SocketStream](/zh-cn/configuration/input-plugins/SocketStream) @@ -49,6 +51,8 @@ - [Hdfs](/zh-cn/configuration/output-plugins/Hdfs) - [Jdbc](/zh-cn/configuration/output-plugins/Jdbc) - [Kafka](/zh-cn/configuration/output-plugins/Kafka) + - [Kudu](/zh-cn/configuration/output-plugins/Kudu) + - [MongoDB](/zh-cn/configuration/output-plugins/MongoDB) - [MySQL](/zh-cn/configuration/output-plugins/MySQL) - [S3](/zh-cn/configuration/output-plugins/S3) - [Stdout](/zh-cn/configuration/output-plugins/Stdout) diff --git a/docs/zh-cn/configuration/input-plugins/Hive.docs b/docs/zh-cn/configuration/input-plugins/Hive.docs new file mode 100644 index 00000000000..15c2a457a56 --- /dev/null +++ b/docs/zh-cn/configuration/input-plugins/Hive.docs @@ -0,0 +1,11 @@ +@waterdropPlugin +@pluginGroup input +@pluginName Hive +@pluginDesc "从hive读取原始数据" +@pluginAuthor InterestingLab +@pluginHomepage https://interestinglab.github.io/waterdrop +@pluginVersion 1.0.0 + +@pluginOption +string pre_sql yes "进行预处理的sql, 如果不需要预处理,可以使用select * from hive_db.hive_table" +string table_name yes "预处理sql的到数据注册成的临时表名" \ No newline at end of file diff --git a/docs/zh-cn/configuration/input-plugins/Hive.md b/docs/zh-cn/configuration/input-plugins/Hive.md new file mode 100644 index 00000000000..9e1ba8a4b93 --- /dev/null +++ b/docs/zh-cn/configuration/input-plugins/Hive.md @@ -0,0 +1,40 @@ +## Input plugin : Hive + +* Author: InterestingLab +* Homepage: https://interestinglab.github.io/waterdrop +* Version: 1.0.0 + +### Description + +从hive中获取数据, + +### Options + +| name | type | required | default value | +| --- | --- | --- | --- | +| [pre_sql](#pre_sql-string) | string | yes | - | +| [table_name](#table_name-string) | string | yes | - | + + +##### pre_sql [string] + +进行预处理的sql, 如果不需要预处理,可以使用select * from hive_db.hive_table + +##### table_name [string] + +经过pre_sql获取到的数据,注册成临时表的表名 + + + +### Example + +``` +hive { + pre_sql = "select * from mydb.mytb" + table_name = "myTable" +} +``` + +### Notes +cluster和client模式下必须把hadoopConf和hive-site.xml置于集群每个节点sparkconf目录下,本地调试将其放在resources目录 + diff --git a/docs/zh-cn/configuration/input-plugins/Kudu.docs b/docs/zh-cn/configuration/input-plugins/Kudu.docs new file mode 100644 index 00000000000..2560294b469 --- /dev/null +++ b/docs/zh-cn/configuration/input-plugins/Kudu.docs @@ -0,0 +1,12 @@ +@waterdropPlugin +@pluginGroup input +@pluginName Kudu +@pluginDesc "从[Apache Kudu](https://kudu.apache.org) 表中读取数据" +@pluginAuthor InterestingLab +@pluginHomepage https://interestinglab.github.io/waterdrop +@pluginVersion 1.0.0 + +@pluginOption +string kudu_master yes "kudu的master,多个master以逗号隔开" +string kudu_table yes "kudu要读取的表名" +string table_name yes "获取到数据注册成的临时表名" \ No newline at end of file diff --git a/docs/zh-cn/configuration/input-plugins/Kudu.md b/docs/zh-cn/configuration/input-plugins/Kudu.md new file mode 100644 index 00000000000..76e8742143e --- /dev/null +++ b/docs/zh-cn/configuration/input-plugins/Kudu.md @@ -0,0 +1,42 @@ +## Input plugin : Kudu + +* Author: InterestingLab +* Homepage: https://interestinglab.github.io/waterdrop +* Version: 1.0.0 + +### Description + +从[Apache Kudu](https://kudu.apache.org) 表中读取数据. + +### Options + +| name | type | required | default value | +| --- | --- | --- | --- | +| [kudu_master](#kudu_master-string) | string | yes | - | +| [kudu_table](#kudu_table) | string | yes | - | +| [table_name](#table_name-string) | string | yes | - | + + +##### kudu_master [string] + +kudu的master,多个master以逗号隔开 + +##### kudu_table [string] + +kudu中要读取的表名 + +##### table_name [string] + +获取到的数据,注册成临时表的表名 + + + +### Example + +``` +kudu{ + kudu_master="hadoop01:7051,hadoop02:7051,hadoop03:7051" + kudu_table="my_kudu_table" + table_name="reg_table" + } +``` diff --git a/docs/zh-cn/configuration/input-plugins/MongoDB.docs b/docs/zh-cn/configuration/input-plugins/MongoDB.docs new file mode 100644 index 00000000000..9e4337066cf --- /dev/null +++ b/docs/zh-cn/configuration/input-plugins/MongoDB.docs @@ -0,0 +1,13 @@ +@waterdropPlugin +@pluginGroup input +@pluginName MongoDB +@pluginDesc "从[MongoDB](https://www.mongodb.com/)读取数据" +@pluginAuthor InterestingLab +@pluginHomepage https://interestinglab.github.io/waterdrop +@pluginVersion 1.0.0 + +@pluginOption +string readConfig.uri yes "mongoDB uri" +string readConfig.database yes "要读取的database" +string readConfig.collection yes "要读取的collection" +string table_name yes "读取数据注册成的临时表名" \ No newline at end of file diff --git a/docs/zh-cn/configuration/input-plugins/MongoDB.md b/docs/zh-cn/configuration/input-plugins/MongoDB.md new file mode 100644 index 00000000000..9db129fe7d9 --- /dev/null +++ b/docs/zh-cn/configuration/input-plugins/MongoDB.md @@ -0,0 +1,54 @@ +## Input plugin : MongoDB + +* Author: InterestingLab +* Homepage: https://interestinglab.github.io/waterdrop +* Version: 1.0.0 + +### Description + +从[MongoDB](https://www.mongodb.com/)读取数据 + +### Options + +| name | type | required | default value | +| --- | --- | --- | --- | +| [readconfig.uri](#readconfig.uri-string) | string | yes | - | +| [readconfig.database](#readconfig.database-string) | string | yes | - | +| [readconfig.collection](#readconfig.collection-string) | string | yes | - | +| [readconfig.*](#readconfig.*-string) | string | no | - | +| [table_name](#table_name-string) | string | yes | - | + + +##### readconfig.uri [string] + +要读取mongoDB的uri + +##### readconfig.database [string] + +要读取mongoDB的database + +##### readconfig.collection [string] + +要读取mongoDB的collection + +#### readconfig + +这里还可以配置更多其他参数,详见https://docs.mongodb.com/spark-connector/v1.1/configuration/, 参见其中的`Input Configuration`部分 +指定参数的方式是在原参数名称上加上前缀"readconfig." 如设置`spark.mongodb.input.partitioner`的方式是 `readconfig.spark.mongodb.input.partitioner="MongoPaginateBySizePartitioner"`。如果不指定这些非必须参数,将使用MongoDB官方文档的默认值 + +##### table_name [string] + +从mongoDB获取到的数据,注册成临时表的表名 + + +### Example + +``` +mongodb{ + readconfig.uri="mongodb://myhost:mypost" + readconfig.database="mydatabase" + readconfig.collection="mycollection" + readconfig.spark.mongodb.input.partitioner = "MongoPaginateBySizePartitioner" + table_name = "test" + } +``` diff --git a/docs/zh-cn/configuration/output-plugins/Kudu.docs b/docs/zh-cn/configuration/output-plugins/Kudu.docs new file mode 100644 index 00000000000..3e281551ad5 --- /dev/null +++ b/docs/zh-cn/configuration/output-plugins/Kudu.docs @@ -0,0 +1,12 @@ +@waterdropPlugin +@pluginGroup output +@pluginName Kudu +@pluginDesc "写入数据到[Apache Kudu](https://kudu.apache.org)表中" +@pluginAuthor InterestingLab +@pluginHomepage https://interestinglab.github.io/waterdrop +@pluginVersion 1.0.0 + +@pluginOption +string kudu_master yes "kudu的master,多个master以逗号隔开" +string kudu_table yes "kudu中要写入的表名,表必须已经存在" +string mode="insert" no "写入kudu模式 insert|update|upsert|insertIgnore" \ No newline at end of file diff --git a/docs/zh-cn/configuration/output-plugins/Kudu.md b/docs/zh-cn/configuration/output-plugins/Kudu.md new file mode 100644 index 00000000000..346fd50e203 --- /dev/null +++ b/docs/zh-cn/configuration/output-plugins/Kudu.md @@ -0,0 +1,43 @@ +## Output plugin : Kudu + +* Author: InterestingLab +* Homepage: https://interestinglab.github.io/waterdrop +* Version: 1.0.0 + +### Description + +写入数据到[Apache Kudu](https://kudu.apache.org)表中 + +### Options + +| name | type | required | default value | +| --- | --- | --- | --- | +| [kudu_master](#kudu_master-string) | string | yes | - | +| [kudu_table](#kudu_table) | string | yes | - | +| [mode](#mode-string) | string | no | insert | + + +##### kudu_master [string] + +kudu的master,多个master以逗号隔开 + +##### kudu_table [string] + +kudu中要写入的表名,表必须已经存在 + +##### mode [string] + +写入kudu中采取的模式,支持 insert|update|upsert|insertIgnore,默认为insert +insert和insertIgnore :insert在遇见主键冲突将会报错,insertIgnore不会报错,将会舍弃这条数据 +update和upsert :update找不到要更新的主键将会报错,upsert不会,将会把这条数据插入 + + +### Example + +``` +kudu{ + kudu_master="hadoop01:7051,hadoop02:7051,hadoop03:7051" + kudu_table="my_kudu_table" + mode="upsert" + } +``` diff --git a/docs/zh-cn/configuration/output-plugins/MongoDB.docs b/docs/zh-cn/configuration/output-plugins/MongoDB.docs new file mode 100644 index 00000000000..7a5d5bc337a --- /dev/null +++ b/docs/zh-cn/configuration/output-plugins/MongoDB.docs @@ -0,0 +1,12 @@ +@waterdropPlugin +@pluginGroup output +@pluginName MongoDB +@pluginDesc "写入数据到[MongoDB](https://www.mongodb.com/)" +@pluginAuthor InterestingLab +@pluginHomepage https://interestinglab.github.io/waterdrop +@pluginVersion 1.0.0 + +@pluginOption +string readConfig.uri yes "mongoDB uri" +string readConfig.database yes "要写入的database" +string readConfig.collection yes "要写入的collection" \ No newline at end of file diff --git a/docs/zh-cn/configuration/output-plugins/MongoDB.md b/docs/zh-cn/configuration/output-plugins/MongoDB.md new file mode 100644 index 00000000000..2c79571dfe7 --- /dev/null +++ b/docs/zh-cn/configuration/output-plugins/MongoDB.md @@ -0,0 +1,49 @@ +## Output plugin : MongoDB + +* Author: InterestingLab +* Homepage: https://interestinglab.github.io/waterdrop +* Version: 1.0.0 + +### Description + +写入数据到[MongoDB](https://www.mongodb.com/) + +### Options + +| name | type | required | default value | +| --- | --- | --- | --- | +| [writeconfig.uri](#writeconfig.uri-string) | string | yes | - | +| [writeconfig.database](#writeconfig.database-string) | string | yes | - | +| [writeconfig.collection](#writeconfig.collection-string) | string | yes | - | +| [writeconfig.*](#writeconfig.*-string) | string | no | - | + + + +##### writeconfig.uri [string] + +要写入mongoDB的uri + +##### writeconfig.database [string] + +要写入mongoDB的database + +##### writeconfig.collection [string] + +要写入mongoDB的collection + +#### writeconfig + +这里还可以配置更多其他参数,详见https://docs.mongodb.com/spark-connector/v1.1/configuration/ +, 参见其中的`Output Configuration`部分 +指定参数的方式是在原参数名称上加上前缀"writeconfig." 如设置`localThreshold`的方式是 `writeconfig.localThreshold=20`。如果不指定这些非必须参数,将使用MongoDB官方文档的默认值 + + +### Example + +``` +mongodb{ + writeconfig.uri="mongodb://myhost:mypost" + writeconfig.database="mydatabase" + writeconfig.collection="mycollection" + } +``` diff --git a/waterdrop-core/build.sbt b/waterdrop-core/build.sbt index 60a3ea39d1c..157fe0148b8 100644 --- a/waterdrop-core/build.sbt +++ b/waterdrop-core/build.sbt @@ -33,6 +33,9 @@ libraryDependencies ++= Seq( exclude("org.spark-project.spark", "unused") exclude("net.jpountz.lz4", "unused"), "com.typesafe" % "config" % "1.3.1", + "org.apache.spark" %% "spark-hive" % sparkVersion , + "org.mongodb.spark" %% "mongo-spark-connector" % "2.2.0", + "org.apache.kudu" %% "kudu-spark2" % "1.7.0", "com.alibaba" % "QLExpress" % "3.2.0", "com.alibaba" % "fastjson" % "1.2.47", "commons-lang" % "commons-lang" % "2.6", @@ -41,7 +44,7 @@ libraryDependencies ++= Seq( "org.elasticsearch" % "elasticsearch-spark-20_2.11" % "5.6.3", "com.github.scopt" %% "scopt" % "3.7.0", "org.apache.commons" % "commons-compress" % "1.15", - "ru.yandex.clickhouse" % "clickhouse-jdbc" % "0.1.39" excludeAll(ExclusionRule(organization="com.fasterxml.jackson.core")) + "ru.yandex.clickhouse" % "clickhouse-jdbc" % "0.1.39" exclude("com.google.guava","guava") excludeAll(ExclusionRule(organization="com.fasterxml.jackson.core")) ) // For binary compatible conflicts, sbt provides dependency overrides. diff --git a/waterdrop-core/src/main/resources/META-INF/services/io.github.interestinglab.waterdrop.apis.BaseOutput b/waterdrop-core/src/main/resources/META-INF/services/io.github.interestinglab.waterdrop.apis.BaseOutput index 8deb1e6c7d8..987cfe80d2f 100644 --- a/waterdrop-core/src/main/resources/META-INF/services/io.github.interestinglab.waterdrop.apis.BaseOutput +++ b/waterdrop-core/src/main/resources/META-INF/services/io.github.interestinglab.waterdrop.apis.BaseOutput @@ -6,4 +6,6 @@ io.github.interestinglab.waterdrop.output.Jdbc io.github.interestinglab.waterdrop.output.Kafka io.github.interestinglab.waterdrop.output.Mysql io.github.interestinglab.waterdrop.output.S3 -io.github.interestinglab.waterdrop.output.Stdout \ No newline at end of file +io.github.interestinglab.waterdrop.output.Stdout +io.github.interestinglab.waterdrop.output.MongoDB +io.github.interestinglab.waterdrop.output.Kudu \ No newline at end of file diff --git a/waterdrop-core/src/main/resources/META-INF/services/io.github.interestinglab.waterdrop.apis.BaseStaticInput b/waterdrop-core/src/main/resources/META-INF/services/io.github.interestinglab.waterdrop.apis.BaseStaticInput index ff290da6052..b668dd7ca04 100644 --- a/waterdrop-core/src/main/resources/META-INF/services/io.github.interestinglab.waterdrop.apis.BaseStaticInput +++ b/waterdrop-core/src/main/resources/META-INF/services/io.github.interestinglab.waterdrop.apis.BaseStaticInput @@ -1,3 +1,6 @@ io.github.interestinglab.waterdrop.input.Fake2 io.github.interestinglab.waterdrop.input.Hdfs -io.github.interestinglab.waterdrop.input.File \ No newline at end of file +io.github.interestinglab.waterdrop.input.File +io.github.interestinglab.waterdrop.input.Hive +io.github.interestinglab.waterdrop.input.MongoDB +io.github.interestinglab.waterdrop.input.Kudu \ No newline at end of file diff --git a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/Waterdrop.scala b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/Waterdrop.scala index 9288a741bae..4b92a4541ba 100644 --- a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/Waterdrop.scala +++ b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/Waterdrop.scala @@ -172,7 +172,7 @@ object Waterdrop extends Logging { println("\t" + key + " => " + value) }) - val sparkSession = SparkSession.builder.config(sparkConf).getOrCreate() + val sparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport().getOrCreate() // find all user defined UDFs and register in application init UdfRegister.findAndRegisterUdfs(sparkSession) diff --git a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/input/Hive.scala b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/input/Hive.scala new file mode 100644 index 00000000000..26e56df8aa5 --- /dev/null +++ b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/input/Hive.scala @@ -0,0 +1,34 @@ +package io.github.interestinglab.waterdrop.input + +import com.typesafe.config.{Config, ConfigFactory} +import io.github.interestinglab.waterdrop.apis.BaseStaticInput +import org.apache.spark.sql.{Dataset, Row, SparkSession} + +class Hive extends BaseStaticInput { + var config: Config = ConfigFactory.empty() + + override def setConfig(config: Config): Unit = { + this.config = config + } + + override def getConfig(): Config = { + this.config + } + + override def checkConfig(): (Boolean, String) = { + config.hasPath("table_name") && config.hasPath("pre_sql") match { + case true => (true, "") + case false => (false, "please specify [table_name] and [pre_sql]") + } + } + + + override def getDataset(spark: SparkSession): Dataset[Row] = { + + val regTable = config.getString("table_name") + val ds = spark.sql(config.getString("pre_sql")) + ds.createOrReplaceTempView(s"$regTable") + ds + } + +} diff --git a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/input/Kudu.scala b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/input/Kudu.scala new file mode 100644 index 00000000000..42e011526e3 --- /dev/null +++ b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/input/Kudu.scala @@ -0,0 +1,40 @@ +package io.github.interestinglab.waterdrop.input + + +import com.typesafe.config.{Config, ConfigFactory} +import io.github.interestinglab.waterdrop.apis.BaseStaticInput +import org.apache.spark.sql.{Dataset, Row, SparkSession} +import org.apache.kudu.spark.kudu._ + + +class Kudu extends BaseStaticInput { + + var config: Config = ConfigFactory.empty() + + override def setConfig(config: Config): Unit = { + this.config = config + } + + override def getConfig(): Config = { + this.config + } + + override def checkConfig(): (Boolean, String) = { + config.hasPath("kudu_master") && config.hasPath("kudu_table") && config.hasPath("table_name") match { + case true => (true, "") + case false => (false, "please specify [kudu_master] and [kudu_table] and [table_name]") + } + } + + + override def getDataset(spark: SparkSession): Dataset[Row] = { + val mapConf = Map( + "kudu.master" -> config.getString("kudu_master"), + "kudu.table" -> config.getString("kudu_table")) + + val ds = spark.read.format("org.apache.kudu.spark.kudu") + .options(mapConf).kudu + ds.createOrReplaceTempView(config.getString("table_name")) + ds + } +} diff --git a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/input/MongoDB.scala b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/input/MongoDB.scala new file mode 100644 index 00000000000..75d8c214117 --- /dev/null +++ b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/input/MongoDB.scala @@ -0,0 +1,60 @@ +package io.github.interestinglab.waterdrop.input + +import com.mongodb.spark.MongoSpark +import com.mongodb.spark.config.ReadConfig +import com.typesafe.config.{Config, ConfigFactory} +import io.github.interestinglab.waterdrop.apis.BaseStaticInput +import org.apache.spark.sql.{Dataset, Row, SparkSession} + +import scala.collection.JavaConversions._ + +class MongoDB extends BaseStaticInput { + + var config: Config = ConfigFactory.empty() + + var readConfig: ReadConfig = _ + + val confPrefix = "readconfig" + + override def setConfig(config: Config): Unit = { + this.config = config + } + + override def getConfig(): Config = { + this.config + } + + override def checkConfig(): (Boolean, String) = { + + config.hasPath(confPrefix) && config.hasPath("table_name") match { + case true => { + val read = config.getConfig(confPrefix) + read.hasPath("uri") && read.hasPath("database") && read.hasPath("collection") match { + case true => (true, "") + case false => (false, "please specify [readconfig.uri] and [readconfig.database] and [readconfig.collection]") + } + } + case false => (false, "please specify [readconfig] and [table_name]") + } + } + + override def prepare(spark: SparkSession): Unit = { + super.prepare(spark) + val map = new collection.mutable.HashMap[String, String] + config + .getConfig(confPrefix) + .entrySet() + .foreach(entry => { + val key = entry.getKey + val value = String.valueOf(entry.getValue.unwrapped()) + map.put(key, value) + }) + readConfig = ReadConfig(map) + } + + + override def getDataset(spark: SparkSession): Dataset[Row] = { + MongoSpark.load(spark, readConfig) + } + +} diff --git a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/Kudu.scala b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/Kudu.scala new file mode 100644 index 00000000000..0f9e655da8a --- /dev/null +++ b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/Kudu.scala @@ -0,0 +1,49 @@ +package io.github.interestinglab.waterdrop.output + +import com.typesafe.config.{Config, ConfigFactory} +import io.github.interestinglab.waterdrop.apis.{BaseOutput} +import org.apache.kudu.spark.kudu._ +import org.apache.spark.sql.{Dataset, Row} +import scala.collection.JavaConversions._ + + +class Kudu extends BaseOutput { + + var config: Config = ConfigFactory.empty() + + override def setConfig(config: Config): Unit = { + val defaultConfig = ConfigFactory.parseMap( + Map( + "mode" -> "insert" + ) + ) + + this.config = config.withFallback(defaultConfig) + } + + override def getConfig(): Config = { + this.config + } + + override def checkConfig(): (Boolean, String) = { + config.hasPath("kudu_master") && config.hasPath("kudu_table") match { + case true => (true, "") + case false => (false, "please specify [kudu_master] and [kudu_table] ") + } + } + + + override def process(df: Dataset[Row]): Unit = { + + val kuduContext = new KuduContext(config.getString("kudu_master"),df.sparkSession.sparkContext) + + val table = config.getString("kudu_table") + + config.getString("mode") match { + case "insert" => kuduContext.insertRows(df,table) + case "update" => kuduContext.updateRows(df,table) + case "upsert" => kuduContext.upsertRows(df,table) + case "insertIgnore" => kuduContext.insertIgnoreRows(df,table) + } + } +} diff --git a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/MongoDB.scala b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/MongoDB.scala new file mode 100644 index 00000000000..49efe06424d --- /dev/null +++ b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/MongoDB.scala @@ -0,0 +1,59 @@ +package io.github.interestinglab.waterdrop.output + +import com.mongodb.spark.MongoSpark +import com.mongodb.spark.config.WriteConfig +import com.typesafe.config.{Config, ConfigFactory} +import io.github.interestinglab.waterdrop.apis.BaseOutput +import org.apache.spark.sql.{Dataset, Row, SparkSession} + +import scala.collection.JavaConversions._ + +class MongoDB extends BaseOutput { + + var config: Config = ConfigFactory.empty() + + val confPrefix = "writeconfig" + + var writeConfig :WriteConfig = _ + + override def setConfig(config: Config): Unit = { + this.config = config + } + + override def getConfig(): Config = { + this.config + } + + + override def checkConfig(): (Boolean, String) = { + + config.hasPath(confPrefix) match { + case true => { + val read = config.getConfig(confPrefix) + read.hasPath("uri") && read.hasPath("database") && read.hasPath("collection") match { + case true => (true, "") + case false => (false, "please specify [writeconfig.uri] and [writeconfig.database] and [writeconfig.collection]") + } + } + case false => (false, "please specify [writeconfig] ") + } + } + + override def prepare(spark: SparkSession): Unit = { + super.prepare(spark) + val map = new collection.mutable.HashMap[String, String] + config + .getConfig(confPrefix) + .entrySet() + .foreach(entry => { + val key = entry.getKey + val value = String.valueOf(entry.getValue.unwrapped()) + map.put(key, value) + }) + writeConfig = WriteConfig(map) + } + + override def process(df: Dataset[Row]): Unit = { + MongoSpark.save(df,writeConfig) + } +}