From 3f41bd6cdf2478ace9ead6df6dc4c9a4f62e3526 Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 4 Apr 2018 11:59:59 +0300 Subject: [PATCH 1/5] Custom sink provider for structured streaming --- .../hbase/HBaseStreamSinkProvider.scala | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseStreamSinkProvider.scala diff --git a/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseStreamSinkProvider.scala b/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseStreamSinkProvider.scala new file mode 100644 index 00000000..9a807e23 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseStreamSinkProvider.scala @@ -0,0 +1,60 @@ +package org.apache.spark.sql.execution.datasources.hbase + +import org.apache.spark.sql.execution.streaming.Sink +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.{DataFrame, SQLContext} + +class HBaseStreamSink(options: Map[String, String]) extends Sink with Logging { + + val defaultFormat = "org.apache.spark.sql.execution.datasources.hbase" + // String with HBaseTableCatalog.tableCatalog + private val hBaseCatalog = + options.get("hbasecatalog").map(_.toString).getOrElse("") + + if (hBaseCatalog.isEmpty) + throw new IllegalArgumentException( + "hbasecatalog - variable must be specified in option") + + private val newTableCount = + options.get("newtablecount").map(_.toString).getOrElse("5") + + override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized { + + /** As per SPARK-16020 arbitrary transformations are not supported, but + * converting to an RDD allows us to do magic. + */ + val df = data.sparkSession.createDataFrame(data.rdd, data.schema) + df.write + .options(Map(HBaseTableCatalog.tableCatalog -> hBaseCatalog, + HBaseTableCatalog.newTable -> newTableCount)) + .format(defaultFormat) + .save() + } +} + +/** + * In option must be specified string with HBaseTableCatalog.tableCatalog + * {{{ + * inputDF. + * writeStream. * + * format("org.apache.spark.sql.execution.datasources.hbase.HBaseStreamSinkProvider"). + * option("checkpointLocation", checkPointProdPath). + * option("hbasecatalog", catalog). + * outputMode(OutputMode.Update()). + * trigger(Trigger.ProcessingTime(30.seconds)). + * start + * }}} + */ +class HBaseStreamSinkProvider + extends StreamSinkProvider + with DataSourceRegister { + def createSink(sqlContext: SQLContext, + parameters: Map[String, String], + partitionColumns: Seq[String], + outputMode: OutputMode): Sink = { + new HBaseStreamSink(parameters) + } + + def shortName(): String = "hbase" +} From ce833559e8c2e4081f451c4a916a0badcb06b1f8 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 10 Apr 2018 13:36:16 +0300 Subject: [PATCH 2/5] added support for all possible settings shc hbase by setting them across the map --- .../hbase/HBaseStreamSinkProvider.scala | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseStreamSinkProvider.scala b/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseStreamSinkProvider.scala index 9a807e23..99074e90 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseStreamSinkProvider.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseStreamSinkProvider.scala @@ -5,19 +5,18 @@ import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.{DataFrame, SQLContext} -class HBaseStreamSink(options: Map[String, String]) extends Sink with Logging { +class HBaseStreamSink(parameters: Map[String, String]) + extends Sink + with Logging { val defaultFormat = "org.apache.spark.sql.execution.datasources.hbase" // String with HBaseTableCatalog.tableCatalog private val hBaseCatalog = - options.get("hbasecatalog").map(_.toString).getOrElse("") + parameters.get(HBaseTableCatalog.tableCatalog).map(_.toString).getOrElse("") if (hBaseCatalog.isEmpty) throw new IllegalArgumentException( - "hbasecatalog - variable must be specified in option") - - private val newTableCount = - options.get("newtablecount").map(_.toString).getOrElse("5") + "HBaseTableCatalog.tableCatalog - must be specified in option") override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized { @@ -25,9 +24,9 @@ class HBaseStreamSink(options: Map[String, String]) extends Sink with Logging { * converting to an RDD allows us to do magic. */ val df = data.sparkSession.createDataFrame(data.rdd, data.schema) + df.write - .options(Map(HBaseTableCatalog.tableCatalog -> hBaseCatalog, - HBaseTableCatalog.newTable -> newTableCount)) + .options(parameters) .format(defaultFormat) .save() } @@ -40,7 +39,7 @@ class HBaseStreamSink(options: Map[String, String]) extends Sink with Logging { * writeStream. * * format("org.apache.spark.sql.execution.datasources.hbase.HBaseStreamSinkProvider"). * option("checkpointLocation", checkPointProdPath). - * option("hbasecatalog", catalog). + * options(Map("schema_array"->schema_array,"schema_record"->schema_record, HBaseTableCatalog.tableCatalog->catalog)). * outputMode(OutputMode.Update()). * trigger(Trigger.ProcessingTime(30.seconds)). * start From 526c03fc5b64858b8bd5d35c603ff15fb839da8c Mon Sep 17 00:00:00 2001 From: Your Name Date: Thu, 12 Apr 2018 19:43:33 +0300 Subject: [PATCH 3/5] After review: Registred short name "hbase" of sink provider. For all option related with HBase checking prefix "hbase." (like catalog, newtable and etc.) --- core/pom.xml | 6 ++++ ...pache.spark.sql.sources.DataSourceRegister | 1 + .../HBaseStreamSinkProvider.scala | 33 +++++++++++++------ 3 files changed, 30 insertions(+), 10 deletions(-) create mode 100644 core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister rename core/src/main/scala/org/apache/spark/sql/execution/{datasources/hbase => streaming}/HBaseStreamSinkProvider.scala (63%) diff --git a/core/pom.xml b/core/pom.xml index 77a53f25..e09b9994 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -184,6 +184,12 @@ + + + src/main/resources/META-INF/ + META-INF + + diff --git a/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister new file mode 100644 index 00000000..1d187895 --- /dev/null +++ b/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -0,0 +1 @@ +org.apache.spark.sql.execution.streaming.HBaseStreamSinkProvider \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseStreamSinkProvider.scala b/core/src/main/scala/org/apache/spark/sql/execution/streaming/HBaseStreamSinkProvider.scala similarity index 63% rename from core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseStreamSinkProvider.scala rename to core/src/main/scala/org/apache/spark/sql/execution/streaming/HBaseStreamSinkProvider.scala index 99074e90..5c5976a9 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseStreamSinkProvider.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/streaming/HBaseStreamSinkProvider.scala @@ -1,6 +1,9 @@ -package org.apache.spark.sql.execution.datasources.hbase +package org.apache.spark.sql.execution.streaming -import org.apache.spark.sql.execution.streaming.Sink +import org.apache.spark.sql.execution.datasources.hbase.{ + HBaseTableCatalog, + Logging +} import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.{DataFrame, SQLContext} @@ -9,14 +12,24 @@ class HBaseStreamSink(parameters: Map[String, String]) extends Sink with Logging { - val defaultFormat = "org.apache.spark.sql.execution.datasources.hbase" - // String with HBaseTableCatalog.tableCatalog + private val defaultFormat = "org.apache.spark.sql.execution.datasources.hbase" + + private val hbaseOptionPrefix = "hbase." + + private val hbaseSettings = parameters.filterKeys( + _.toLowerCase matches hbaseOptionPrefix + "*") map { + case (k, v) => (k.replace(hbaseOptionPrefix, ""), v) + } + private val hBaseCatalog = - parameters.get(HBaseTableCatalog.tableCatalog).map(_.toString).getOrElse("") + hbaseSettings + .get(HBaseTableCatalog.tableCatalog) + .map(_.toString) + .getOrElse("") if (hBaseCatalog.isEmpty) throw new IllegalArgumentException( - "HBaseTableCatalog.tableCatalog - must be specified in option") + "hbase.catalog - must be specified in option") override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized { @@ -26,7 +39,7 @@ class HBaseStreamSink(parameters: Map[String, String]) val df = data.sparkSession.createDataFrame(data.rdd, data.schema) df.write - .options(parameters) + .options(hbaseSettings) .format(defaultFormat) .save() } @@ -36,10 +49,10 @@ class HBaseStreamSink(parameters: Map[String, String]) * In option must be specified string with HBaseTableCatalog.tableCatalog * {{{ * inputDF. - * writeStream. * - * format("org.apache.spark.sql.execution.datasources.hbase.HBaseStreamSinkProvider"). + * writeStream. + * format("hbase"). * option("checkpointLocation", checkPointProdPath). - * options(Map("schema_array"->schema_array,"schema_record"->schema_record, HBaseTableCatalog.tableCatalog->catalog)). + * options(Map("hbase.schema_array"->schema_array,"hbase.schema_record"->schema_record, hbase..catalog->catalog)). * outputMode(OutputMode.Update()). * trigger(Trigger.ProcessingTime(30.seconds)). * start From e90c7910c096b0b38cae10a123543e7a3c4da09d Mon Sep 17 00:00:00 2001 From: Your Name Date: Thu, 12 Apr 2018 22:19:33 +0300 Subject: [PATCH 4/5] minor fix in commets --- .../streaming/HBaseStreamSinkProvider.scala | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/execution/streaming/HBaseStreamSinkProvider.scala b/core/src/main/scala/org/apache/spark/sql/execution/streaming/HBaseStreamSinkProvider.scala index 5c5976a9..66cf1817 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/streaming/HBaseStreamSinkProvider.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/streaming/HBaseStreamSinkProvider.scala @@ -12,6 +12,8 @@ class HBaseStreamSink(parameters: Map[String, String]) extends Sink with Logging { + @volatile private var latestBatchId = -1L + private val defaultFormat = "org.apache.spark.sql.execution.datasources.hbase" private val hbaseOptionPrefix = "hbase." @@ -32,16 +34,20 @@ class HBaseStreamSink(parameters: Map[String, String]) "hbase.catalog - must be specified in option") override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized { + if (batchId <= latestBatchId) { + logInfo(s"Skipping already committed batch $batchId") + } else { - /** As per SPARK-16020 arbitrary transformations are not supported, but - * converting to an RDD allows us to do magic. - */ - val df = data.sparkSession.createDataFrame(data.rdd, data.schema) + /** As per SPARK-16020 arbitrary transformations are not supported, but + * converting to an RDD allows us to do magic. + */ + val df = data.sparkSession.createDataFrame(data.rdd, data.schema) - df.write - .options(hbaseSettings) - .format(defaultFormat) - .save() + df.write + .options(hbaseSettings) + .format(defaultFormat) + .save() + } } } @@ -52,7 +58,7 @@ class HBaseStreamSink(parameters: Map[String, String]) * writeStream. * format("hbase"). * option("checkpointLocation", checkPointProdPath). - * options(Map("hbase.schema_array"->schema_array,"hbase.schema_record"->schema_record, hbase..catalog->catalog)). + * options(Map("hbase.schema_array"->schema_array,"hbase.schema_record"->schema_record, hbase.catalog->catalog)). * outputMode(OutputMode.Update()). * trigger(Trigger.ProcessingTime(30.seconds)). * start From d2001a5f80db3f08c5be636feab26ef936342116 Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 13 Apr 2018 16:21:35 +0300 Subject: [PATCH 5/5] fixes "spark counters" and "hbase." prefix parser --- .../streaming/HBaseStreamSinkProvider.scala | 53 +++++++++---------- 1 file changed, 24 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/execution/streaming/HBaseStreamSinkProvider.scala b/core/src/main/scala/org/apache/spark/sql/execution/streaming/HBaseStreamSinkProvider.scala index 66cf1817..2bdcbb19 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/streaming/HBaseStreamSinkProvider.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/streaming/HBaseStreamSinkProvider.scala @@ -1,50 +1,45 @@ package org.apache.spark.sql.execution.streaming -import org.apache.spark.sql.execution.datasources.hbase.{ - HBaseTableCatalog, - Logging -} +import java.util.Locale + +import org.apache.spark.sql.catalyst.CatalystTypeConverters +import org.apache.spark.sql.execution.datasources.hbase.Logging import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} import org.apache.spark.sql.streaming.OutputMode -import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.{DataFrame, Row, SQLContext} -class HBaseStreamSink(parameters: Map[String, String]) +class HBaseStreamSink(sqlContext: SQLContext, + parameters: Map[String, String], + partitionColumns: Seq[String], + outputMode: OutputMode) extends Sink with Logging { @volatile private var latestBatchId = -1L private val defaultFormat = "org.apache.spark.sql.execution.datasources.hbase" + private val prefix = "hbase." - private val hbaseOptionPrefix = "hbase." - - private val hbaseSettings = parameters.filterKeys( - _.toLowerCase matches hbaseOptionPrefix + "*") map { - case (k, v) => (k.replace(hbaseOptionPrefix, ""), v) - } - - private val hBaseCatalog = - hbaseSettings - .get(HBaseTableCatalog.tableCatalog) - .map(_.toString) - .getOrElse("") - - if (hBaseCatalog.isEmpty) - throw new IllegalArgumentException( - "hbase.catalog - must be specified in option") + private val specifiedHBaseParams = parameters + .keySet + .filter(_.toLowerCase(Locale.ROOT).startsWith(prefix)) + .map { k => k.drop(prefix.length).toString -> parameters(k) } + .toMap override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized { if (batchId <= latestBatchId) { logInfo(s"Skipping already committed batch $batchId") } else { + // use a local variable to make sure the map closure doesn't capture the whole DataFrame + val schema = data.schema + val res = data.queryExecution.toRdd.mapPartitions { rows => + val converter = CatalystTypeConverters.createToScalaConverter(schema) + rows.map(converter(_).asInstanceOf[Row]) + } - /** As per SPARK-16020 arbitrary transformations are not supported, but - * converting to an RDD allows us to do magic. - */ - val df = data.sparkSession.createDataFrame(data.rdd, data.schema) - + val df = sqlContext.sparkSession.createDataFrame(res, schema) df.write - .options(hbaseSettings) + .options(specifiedHBaseParams) .format(defaultFormat) .save() } @@ -71,7 +66,7 @@ class HBaseStreamSinkProvider parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { - new HBaseStreamSink(parameters) + new HBaseStreamSink(sqlContext, parameters, partitionColumns, outputMode) } def shortName(): String = "hbase"