Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SHC with Spark Structured Streaming #205

Open
EDALJO opened this issue Dec 7, 2017 · 35 comments
Open

SHC with Spark Structured Streaming #205

EDALJO opened this issue Dec 7, 2017 · 35 comments

Comments

@EDALJO
Copy link

EDALJO commented Dec 7, 2017

Hi,

I have a Spark Structured Streaming application where I'd like to write streaming data to HBase using SHC. It reads data from a location where new csv files continuously are being created. The defined catalog works for writing a DataFrame with identical data into HBase.
The key components of my streaming application are a DataStreamReader and a DataStreamWriter.

val inputDataStream = spark
      .readStream
      .option("sep", ",")
      .schema(schema)
      .csv("/path/to/data/*.csv")

inputDataStream
      .writeStream
      .outputMode("append")
      .options(
        Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "2"))
      .format("org.apache.spark.sql.execution.datasources.hbase")
      .start

When running the application I'm getting the following message:

Exception in thread "main" java.lang.UnsupportedOperationException: Data source org.apache.spark.sql.execution.datasources.hbase does not support streamed writing at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:285) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:286) at my.package.SHCStreamingApplication$.main(SHCStreamingApplication.scala:153) at my.package.SHCStreamingApplication.main(SHCStreamingApplication.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Does anyone know a solution or way/workaround to still use the SHC for writing structured streaming data to HBase?
Thanks in advance!

@sutugin
Copy link

sutugin commented Mar 14, 2018

You can write your custom sink provider, inherited from StreamSinkProvider, this is my implementation:

package HBase
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.execution.datasources.hbase._

class HBaseSink(options: Map[String, String]) extends Sink with Logging {
  // String with HBaseTableCatalog.tableCatalog
  private val hBaseCatalog = options.get("hbasecat").map(_.toString).getOrElse("")

  override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {   
    val df = data.sparkSession.createDataFrame(data.rdd, data.schema)
    df.write
      .options(Map(HBaseTableCatalog.tableCatalog->hBaseCatalog,
        HBaseTableCatalog.newTable -> "5"))
      .format("org.apache.spark.sql.execution.datasources.hbase").save()
  }
}

class HBaseSinkProvider extends StreamSinkProvider with DataSourceRegister {
  def createSink(
                  sqlContext: SQLContext,
                  parameters: Map[String, String],
                  partitionColumns: Seq[String],
                  outputMode: OutputMode): Sink = {
    new HBaseSink(parameters)
  }

  def shortName(): String = "hbase"
}

This is example, how to use ():

inputDF.
   writeStream.
   queryName("hbase writer").
   format("HBase.HBaseSinkProvider").
   option("checkpointLocation", checkPointProdPath).
   option("hbasecat", catalog).
   outputMode(OutputMode.Update()).
   trigger(Trigger.ProcessingTime(30.seconds)).
   start

@EDALJO
Copy link
Author

EDALJO commented Mar 20, 2018

Thanks for your answer - exactly the type of solution I was looking for. I only had time to test it quickly, but seems to be working perfectly!

@sutugin
Copy link

sutugin commented Mar 20, 2018

Excellent, glad to help!!!

@hamroune
Copy link

hamroune commented Apr 9, 2018

Thank you, much helps

@merfill
Copy link

merfill commented Jun 27, 2018

I've implemented your solution with HBaseSinkProvider by following steps:

  1. Clone shc.
  2. Put HBaseSinkProvider.scala to core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase.
  3. Compile shc.
  4. Run spark-submit with --jars some_path/shc/core/target/shc-core-1.1.2-2.2-s_2.11-SNAPSHOT.jar

My code is written in python, I'm including it below. The error is: pyspark.sql.utils.StreamingQueryException: u'Queries with streaming sources must be executed with writeStream.start();;
I'm new in Spark and haven't experience Scala, so I cannot understand the problem. Can you please help me with it?

def consume(schema_name, brokers, topic, group_id):
spark = SparkSession
.builder
.appName('SparkConsumer')
.config('hbase.zookeeper.property.clientPort', '2282')
.getOrCreate()

print 'read Avro schema from file: {}...'.format(schema_name)
schema = avro.schema.parse(open(schema_name, 'rb').read())
reader = avro.io.DatumReader(schema)
print 'the schema is read'

rows = spark \
    .readStream \
    .format('kafka') \
    .option('kafka.bootstrap.servers', brokers) \
    .option('subscribe', topic) \
    .option('group.id', group_id) \
    .option('maxOffsetsPerTrigger', 1000) \
    .option("startingOffsets", "earliest") \
    .load()
rows.printSchema()

schema = StructType([ \
        StructField('consumer_id', StringType(), False), \
        StructField('audit_system_id', StringType(), False), \
        StructField('object_path', StringType(), True), \
        StructField('object_type', StringType(), False), \
        StructField('what_action', StringType(), False), \
        StructField('when', LongType(), False), \
        StructField('where', StringType(), False), \
        StructField('who', StringType(), True), \
        StructField('workstation', StringType(), True) \
    ])

def decode_avro(msg):
    bytes_reader = io.BytesIO(bytes(msg))
    decoder = avro.io.BinaryDecoder(bytes_reader)
    data = reader.read(decoder)
    return (\
            data['consumer_id'],\
            data['audit_system_id'],\
            data['object_path'],\
            data['object_type'],\
            data['what_action'],\
            data['when'],\
            data['where'],\
            data['who'],\
            data['workstation']\
           )

udf_decode_avro = udf(decode_avro, schema)

values = rows.select('value')
values.printSchema()

changes = values.withColumn('change', udf_decode_avro(col('value'))).select('change.*')
changes.printSchema()

change_catalog = '''
{
    "table":
    {
        "namespace": "default",
        "name": "changes",
        "tableCoder": "PrimitiveType"
    },
    "rowkey": "consumer_id",
    "columns":
    {
        "consumer_id": {"cf": "rowkey", "col": "consumer_id", "type": "string"},
        "audit_system_id": {"cf": "d", "col": "audit_system_id", "type": "string"},
        "object_path": {"cf": "d", "col": "object_path", "type": "string"},
        "object_type": {"cf": "d", "col": "object_type", "type": "string"},
        "what_action": {"cf": "d", "col": "what_action", "type": "string"},
        "when": {"cf": "t", "col": "when", "type": "bigint"},
        "where": {"cf": "d", "col": "where", "type": "string"},
        "who": {"cf": "d", "col": "who", "type": "string"},
        "workstation": {"cf": "d", "col": "workstation", "type": "string"}
    }
}'''

query = changes \
    .writeStream \
    .format('HBase.HBaseSinkProvider')\
    .option('hbasecat', change_catalog) \
    .option("checkpointLocation", '/tmp/checkpoint') \
    .outputMode("append") \
    .start()

query.awaitTermination()

@sutugin
Copy link

sutugin commented Jun 27, 2018

Try to write to the console orfile, will there be the same error?

@merfill
Copy link

merfill commented Jun 27, 2018

No, when I'm trying to write records to console, everything is OK. I'm using following python code instead
HBase.HBaseSinkProvider:

query = changes \
         .writeStream \
        .outputMode("append") \
        .format('console') \
        .start()

Output is somethinf like this:

Batch: 7

+-----------+---------------+--------------------+-----------+-----------+-------------+-------------+----------------+--------------------+
|consumer_id|audit_system_id| object_path|object_type|what_action| when| where| who| workstation|
+-----------+---------------+--------------------+-----------+-----------+-------------+-------------+----------------+--------------------+
| 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a|
| 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a|
| 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a|
| 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a|
| 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a|
| 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a|
| 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a|
| 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a|
| 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a|
| 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a|
| 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a|
| 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a|
| 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a|
| 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a|
| 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a|
| 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a|
| 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a|
| 111| 222|\172.28.26.190\T...| File| Removed|1520170874201|172.28.26.190| n/a| n/a|
| 111| 222|\172.28.26.190\T...| File| Added|1520171584122|172.28.26.190|PD6\fsatestuser1|win10test1.pd6.local|
| 111| 222|\172.28.26.190\T...| File| Added|1520171584126|172.28.26.190|PD6\fsatestuser1|win10test1.pd6.local|
+-----------+---------------+--------------------+-----------+-----------+-------------+-------------+----------------+--------------------+
only showing top 20 rows

2018-06-27 14:01:26 INFO WriteToDataSourceV2Exec:54 - Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@30f35fc5 committed.
2018-06-27 14:01:26 INFO SparkContext:54 - Starting job: start at NativeMethodAccessorImpl.java:0
2018-06-27 14:01:26 INFO DAGScheduler:54 - Job 15 finished: start at NativeMethodAccessorImpl.java:0, took 0,000037 s
2018-06-27 14:01:26 INFO MicroBatchExecution:54 - Streaming query made progress: {
"id" : "1e6076ad-b403-46ca-9438-e4913660700d",
"runId" : "7740c36b-dc2d-4880-8ea6-0efde89b1ef5",
"name" : null,
"timestamp" : "2018-06-27T11:01:26.389Z",
"batchId" : 7,
"numInputRows" : 669,
"inputRowsPerSecond" : 883.7516512549538,
"processedRowsPerSecond" : 1327.3809523809523,
"durationMs" : {
"addBatch" : 471,
"getBatch" : 4,
"getOffset" : 2,
"queryPlanning" : 11,
"triggerExecution" : 503,
"walCommit" : 15
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[changes]]",
"startOffset" : {
"changes" : {
"0" : 7000
}
},
"endOffset" : {
"changes" : {
"0" : 7669
}
},
"numInputRows" : 669,
"inputRowsPerSecond" : 883.7516512549538,
"processedRowsPerSecond" : 1327.3809523809523
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@3715c84c"
}
}

@sutugin
Copy link

sutugin commented Jun 27, 2018

Unfortunately I am now without a computer, try to run the outputMode to update, if it does not help and will not be able to find a solution, then email me after July 5, I will try to help.

@merfill
Copy link

merfill commented Jun 27, 2018

Unfortunatelly, "update" mode also not work. I've received the same error (see below). Thank you in advance.

pyspark.sql.utils.StreamingQueryException: u'Queries with streaming sources must be executed with writeStream.start();;\nLogicalRDD [key#52, value#53, topic#54, partition#55, offset#56L, timestamp#57, timestampType#58], true\n\n=== Streaming Query ===\nIdentifier: [id = 66509eea-e706-4745-9b35-f82f09752b43, runId = c0e78fe7-f978-4f64-88cd-801d766f78c5]\nCurrent Committed Offsets: {}\nCurrent Available Offsets: {KafkaSource[Subscribe[changes]]: {"changes":{"0":7669}}}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nProject [change#22.consumer_id AS consumer_id#25, change#22.audit_system_id AS audit_system_id#26, change#22.object_path AS object_path#27, change#22.object_type AS object_type#28, change#22.what_action AS what_action#29, change#22.when AS when#30L, change#22.where AS where#31, change#22.who AS who#32, change#22.workstation AS workstation#33]\n+- Project [value#8, decode_avro(value#8) AS change#22]\n +- Project [value#8]\n +- StreamingExecutionRelation KafkaSource[Subscribe[changes]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]\n'

@ddkongbb
Copy link

Please try to use this version of shc (https://github.com/sutugin/shc) and compile with corresponding hbase/phoenix version. I have tried to use this without arvo format perfectly.
/**

  • In option must be specified string with HBaseTableCatalog.tableCatalog
  • {{{
  • inputDF.
  • writeStream.
  • format("hbase").
  • option("checkpointLocation", checkPointProdPath).
  • options(Map("hbase.catalog->catalog)).
  • outputMode(OutputMode.Update()).
  • trigger(Trigger.ProcessingTime(30.seconds)).
  • start
  • }}}
    /
    For using arvo, please try the following.
    /
    *
  • In option must be specified string with HBaseTableCatalog.tableCatalog
  • {{{
  • inputDF.
  • writeStream.
  • format("hbase").
  • option("checkpointLocation", checkPointProdPath).
  • options(Map("hbase.schema_array"->schema_array,"hbase.schema_record"->schema_record, hbase.catalog->catalog)).
  • outputMode(OutputMode.Update()).
  • trigger(Trigger.ProcessingTime(30.seconds)).
  • start
  • }}}
    */

@swarup5s
Copy link

Hello @sutugin .. I've implemented your solution. However data is not getting updated in HBase. It's not even throwing any exception too. Can you suggest anything in this regard?

@sutugin
Copy link

sutugin commented Sep 18, 2018

Hi @swarup5s ,if you give me the implementation code and how you use it, show me the logs, maybe we can find the problem together.

@swarup5s
Copy link

Hi @sutugin thanks for your help. Appreciate it.

//this class is under ...org/apache/spark/sql/execution/streaming/
//path. I'm executing from IntelliJ..
package org.apache.spark.sql.execution.streaming

import org.apache.spark.sql.execution.datasources.hbase.Logging
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Row, SQLContext}

import org.apache.spark.sql.execution.datasources.hbase._

class HBaseSink(options: Map[String, String]) extends Sink with Logging {
// String with HBaseTableCatalog.tableCatalog
private val hBaseCatalog = options.get("hbasecat").map(_.toString).getOrElse("")

override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {
val df = data.sparkSession.createDataFrame(data.rdd, data.schema)
df.write
.options(Map(HBaseTableCatalog.tableCatalog->hBaseCatalog))
.format("org.apache.spark.sql.execution.datasources.hbase").save()
}
}

class HBaseSinkProvider extends StreamSinkProvider with DataSourceRegister {
def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new HBaseSink(parameters)
}

def shortName(): String = "hbase"
}

/**
*My code goes here------------------------------------------------------------------------------
*/
//...
//...

def catalog = s"""{
|"table":{"namespace":"default", "name":"hbase_table"},
|"rowkey":"key",
|"columns":{
|"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
|"subscriberName":{"cf":"subscriberInfo", "col":"Name", "type":"string"},
|"subscriberNumber":{"cf":"subscriberInfo", "col":"PhoneNumber", "type":"string"},
|"messageTemplate":{"cf":"messageInfo", "col":"template", "type":"string"},
|"lastTS":{"cf":"messageInfo", "col":"ts", "type":"String"}
|}
|}""".stripMargin

def withCatalog(cat: String): DataFrame = {
spark.sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}
val BaseRecorddf = withCatalog(catalog) //record from HBase as Batch or normal Dataframe

val streamdf = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server_addr")
.option("subscribe", "topic_name")
.option("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
.option("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
.option("auto.offset.reset","earliest")
.option("group.id","realTimeStream")
.option("enable.auto.commit",true: java.lang.Boolean)
.load()

//streaming Dataframe here
val schemaStreamDf = streamdf
  .selectExpr("CAST(key AS STRING) as IMSI", "CAST(value AS STRING) as Loc")

//...

//some biz logic here and some join between the batch and streaming dataframe and finaldf is the streaming DF
//which should be written back to the HBase in real time.

try{

    finaldfd.
      writeStream.
      format("org.apache.spark.sql.execution.streaming.HBaseSinkProvider").
      option("checkpointLocation", "some_path_here").
      option("hbasecat",catalog).
      outputMode(OutputMode.Update()).
      trigger(Trigger.ProcessingTime(some_seconds))
      .start()
      .awaitTermination()
    
  }
catch {
  case e: Exception => println(e)
}

I've done some changes. now some StreamingQueryException is thrown but not sure what is getting wrong being a novice. Here's the logs:

org.apache.spark.sql.streaming.StreamingQueryException: null
=== Streaming Query ===
Identifier: [id = a0997feb-efeb-4976-b8eb-5efaa3c1b8c9, runId = 6f4181f0-ffd4-4aa9-a0bc-475457e4e93e]
Current Committed Offsets: {}
Current Available Offsets: {KafkaSource[Subscribe[topic_name]]: {"topic_name":{"2":282,"1":305,"0":293}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
Project [rowkey#75, subscriberName#1, subscriberNumber#2, messageTemplate#3, 1537292624 AS lastTS#82]
+- Project [rowkey#11 AS rowkey#75, subscriberName#1, subscriberNumber#2, messageTemplate#3, lastTS#4]
+- Filter (cast(cast(lastTS#4 as bigint) as double) < 1.536990223E9)
+- Project [rowkey#11, subscriberName#1, subscriberNumber#2, messageTemplate#3, lastTS#4]
+- Project [rowkey#11, subscriberName#1, subscriberNumber#2, messageTemplate#3, lastTS#4, Loc#46]
+- Join Inner, (rowkey#11 = rowkey#45)
:- Project [rowkey#0 AS rowkey#11, subscriberName#1, subscriberNumber#2, messageTemplate#3, lastTS#4]
: +- Relation[rowkey#0,subscriberName#1,subscriberNumber#2,messageTemplate#3,lastTS#4] HBaseRelation(Map(catalog -> {
"table":{"namespace":"default", "name":"hbase_table"},
"rowkey":"key",
"columns":{
"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
"subscriberName":{"cf":"subscriberInfo", "col":"Name", "type":"string"},
"subscriberNumber":{"cf":"subscriberInfo", "col":"PhoneNumber", "type":"string"},
"messageTemplate":{"cf":"messageInfo", "col":"template", "type":"string"},
"lastTS":{"cf":"messageInfo", "col":"ts", "type":"String"}
}
}),None)
+- Project [cast(key#30 as string) AS rowkey#45, cast(value#31 as string) AS Loc#46]
+- StreamingExecutionRelation KafkaSource[Subscribe[topic_name]], [key#30, value#31, topic#32, partition#33, offset#34L, timestamp#35, timestampType#36]

On the other hand message is successfully written to the console.

@stefcorda
Copy link

Hello @sutugin, first of all thank your your great help. I'm experiencing @swarup5s 's problem when i call:

data.sparkSession.createDataFrame(data.rdd, data.schema)

The same happens whenever i call something like
data.rdd

I think the problem is outside your code, somewhere else. Maybe more spark related?

@sutugin
Copy link

sutugin commented Oct 11, 2018

Hi @sympho410!
Unfortunately, I can't find the problem by a few lines of code, I need to debug and look for the cause... The only assumption - try to change the order of the columns in accordance with how you have specified in the scheme

@stefcorda
Copy link

Hello @sutugin, I noticed just now you texted back!
I managed to get past that problem thanks to another one of your commits:

 val schema = data.schema

    val res = data.queryExecution.toRdd.mapPartitions { rows =>
      val converter = CatalystTypeConverters.createToScalaConverter(schema)
      rows.map(converter(_).asInstanceOf[Row])
    }

    val df = data.sparkSession.createDataFrame(res, schema)

Now it works properly - Thank you again for your help :)

@vibnimit
Copy link

vibnimit commented Jul 19, 2019

Hello @sutugin and @sympho410 I am also working on a similar kind of problem and I want to make bulk put to HBase from structured spark streaming. I see the code above tries to does the but what I am not able to understand is the use of catalog here. It seems like a predefined schema kind of thing. but since Hbase is schema-less means I can add any new column as well in future so how can I fix a catalog prior?
Can anyone of you explain me here what I am missing? and what exactly is the purpose and meaning of catalog is.
Also, can anyone explain what is "5" here in this line-> HBaseTableCatalog.newTable -> "5" ?
Any help is greatly appreciated.

Thanks in advance!

@sutugin
Copy link

sutugin commented Jul 23, 2019

Hello @sutugin and @sympho410 I am also working on a similar kind of problem and I want to make bulk put to HBase from structured spark streaming. I see the code above tries to does the but what I am not able to understand is the use of catalog here. It seems like a predefined schema kind of thing. but since Hbase is schema-less means I can add any new column as well in future so how can I fix a catalog prior?
Can anyone of you explain me here what I am missing? and what exactly is the purpose and meaning of catalog is.
Also, can anyone explain what is "5" here in this line-> HBaseTableCatalog.newTable -> "5" ?
Any help is greatly appreciated.

Thanks in advance!

It seems to me - the meaning of the catalog is to properly structure the data for serialization and deserialization. The need to specify the scheme is a feature of the implementation of this library and is not tied to the structured streaming. You can try to work around these limitations by generating a schema on the fly, based on the schema of the data inside each butch, but you must be sure that all strings inside the Butch have the same schema or try to use foreach writer and get the schema for each row separately.

@omkarahane
Copy link

omkarahane commented Oct 5, 2019

Can anyone please provide a compiled jar with HbaseSink compiled in, I tried building the shc project but i get the error

HBaseRelation.scala:108: value foreach is not a member of Nothing
        hBaseConfiguration.foreach(_.foreach(e => conf.set(e._1, e._2)))
                                     ^

I tried implementing "hbaseSink" class in my project and used SHC in maven dependency, but it is now working, i get an error as

Queries with streaming sources must be executed with writeStream.start();

It would be very helpful if I can get the compiled jar . Thanks

@sutugin
Copy link

sutugin commented Oct 8, 2019

Can anyone please provide a compiled jar with HbaseSink compiled in, I tried building the shc project but i get the error

HBaseRelation.scala:108: value foreach is not a member of Nothing
        hBaseConfiguration.foreach(_.foreach(e => conf.set(e._1, e._2)))
                                     ^

I tried implementing "hbaseSink" class in my project and used SHC in maven dependency, but it is now working, i get an error as

Queries with streaming sources must be executed with writeStream.start();

It would be very helpful if I can get the compiled jar . Thanks

Try to build from my fork (https://github.com/sutugin/shc), though I have not updated it for a long time. Only in pom.xml specify the actual version of spark for you, for me it is
<sparc.version>2.3.1< / sparc.version>

@omkarahane
Copy link

@sutugin Thanks for replying, I'm working on databricks platform which hosts spark 2.4.3, so I have access to ".foreachbatch" api hence above is not needed anymore.

@sutugin
Copy link

sutugin commented Oct 8, 2019

@omkarahane Good idea, I already wrote someone about this method. #238 (comment)

@omkarahane
Copy link

@sutugin I'm still facing an issue, it is different form the one I mentioned above, here is the link where I have mentioned all the details, please see if you can help.
https://stackoverflow.com/questions/58306725/not-able-to-write-data-to-hbase-using-spark-structured-streaming

Thanks.

@sutugin
Copy link

sutugin commented Oct 10, 2019

@sutugin I'm still facing an issue, it is different form the one I mentioned above, here is the link where I have mentioned all the details, please see if you can help.
https://stackoverflow.com/questions/58306725/not-able-to-write-data-to-hbase-using-spark-structured-streaming

Thanks.

@omkarahane , try make "fat" jar with sbt dependency libraryDependencies += "com.hortonworks.shc" % "shc-core" % "1.1.0.3.1.2.1-1".
I think this will solve the problem with "java.lang.NoClassDefFoundError"

@omkarahane
Copy link

@sutugin I'm still facing an issue, it is different form the one I mentioned above, here is the link where I have mentioned all the details, please see if you can help.
https://stackoverflow.com/questions/58306725/not-able-to-write-data-to-hbase-using-spark-structured-streaming
Thanks.

@omkarahane , try make "fat" jar with sbt dependency libraryDependencies += "com.hortonworks.shc" % "shc-core" % "1.1.0.3.1.2.1-1".
I think this will solve the problem with "java.lang.NoClassDefFoundError"

I tried running a the job with a fat jar which was created using maven, still the issue wasn't resolved. I guess the fat jars created with sbt and maven would almost be the same?

@sutugin
Copy link

sutugin commented Oct 10, 2019

@omkarahane, maybe this will help you #223 (comment)

@omkarahane
Copy link

@omkarahane, maybe this will help you #223 (comment)

@sutugin, Thanks a lot, you pointed me in the right direction, hbase jars were missing, I have added those jars and installed them as a library on the cluster so job has access to them, it solved my initial problem, but now I'm getting another exception:

java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse(Lorg/json4s/JsonInput;Z)Lorg/json4s/JsonAST$JValue;

This also seems to be a dependency issue, This is what I have tried,

  1. Uploaded the following jars: json4s-ast, json4s-core,json4s-jackson
  2. Versions tried, 3.4,3.5,3.6
  3. Put a maven dependency in my jar, built fat jar and uploaded it as library, so transitive dependencies can be satisfied.

Still getting the same error.

@sutugin
Copy link

sutugin commented Oct 11, 2019

@omkarahane, Similar problems are described here:

@631068264
Copy link

631068264 commented Nov 20, 2019

@sutugin @merfill
I add sutugin's scala to compile as merfill said.

But I get error I use Java

  • spark-2.3.2
  • hase-2.0.1
  • scala 2.11.12
  • jdk 1.8
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;

import java.io.IOException;
import java.io.Serializable;
import java.util.Properties;


public class KafkaStructStream implements Serializable {

    private String servers;
    private String jks;
    private String schema;

    public KafkaStructStream(String[] args) {
//        this.servers = args[0];
//        this.jks = args[1];
    }

    private Dataset<Row> initStructKafka() throws IOException {
        Properties prop = Config.getProp();
        this.schema = prop.getProperty("hbase.traffic.schema");
        SparkSession spark = SparkSession
                .builder()
                .appName("Kafka")
                .master("local[*]")
                .getOrCreate();
        return spark.readStream().format("kafka")
                .option("kafka.bootstrap.servers", prop.getProperty("kafka.broker.list"))
                .option("kafka.ssl.truststore.location", Config.getPath(Config.KAFKA_JKS))
//                .option("kafka.bootstrap.servers", this.servers)
//                .option("kafka.ssl.truststore.location", this.jks)
                .option("kafka.ssl.truststore.password", prop.getProperty("kafka.jks.passwd"))
                .option("kafka.security.protocol", "SSL")
                .option("kafka.ssl.endpoint.identification.algorithm", "")
                .option("startingOffsets", "latest")
//                .option("subscribe", kafkaProp.getProperty("kafka.topic"))

                .option("subscribe", "traffic")
                .load()
                .selectExpr("CAST(topic AS STRING)", "CAST(value AS STRING)");
    }

    private void run() {
        Dataset<Row> df = null;
        try {
            df = initStructKafka();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        df.printSchema();

        StructType trafficSchema = new StructType()
                .add("guid", DataTypes.StringType)
                .add("time", DataTypes.LongType)
                .add("end_time", DataTypes.LongType)
                .add("srcip", DataTypes.StringType)
                .add("srcmac", DataTypes.StringType)
                .add("srcport", DataTypes.IntegerType)
                .add("destip", DataTypes.StringType)
                .add("destmac", DataTypes.StringType)
                .add("destport", DataTypes.IntegerType)
                .add("proto", DataTypes.StringType)
                .add("appproto", DataTypes.StringType)
                .add("upsize", DataTypes.LongType)
                .add("downsize", DataTypes.LongType);

        Dataset<Row> ds = df.select(functions.from_json(df.col("value").cast(DataTypes.StringType), trafficSchema).as("data")).select("data.*");
        StreamingQuery query = ds.writeStream()
                .format("HBase.HBaseSinkProvider")
                .option("HBaseTableCatalog.tableCatalog", this.schema)
                .option("checkpointLocation", "/tmp/checkpoint")
                .start();

//        StreamingQuery query = ds.writeStream().format("console")
//                .trigger(Trigger.Continuous("2 seconds"))
//                .start();
        try {
            query.awaitTermination();
        } catch (StreamingQueryException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) {
        KafkaStructStream k = new KafkaStructStream(args);
        k.run();
    }

}

ERROR

19/11/20 15:35:09 ERROR MicroBatchExecution: Query [id = 3f3688bb-6c3d-45bc-ab33-23968069abc0, runId = 0346659e-cb5f-4ee2-919a-00ca124e1e3e] terminated with error
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
LogicalRDD [key#66, value#67, topic#68, partition#69, offset#70L, timestamp#71, timestampType#72], true

	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)
	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:37)
	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:35)
	at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
	at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
	at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2980)
	at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2978)
	at HBase.HBaseSink.addBatch(HBaseSinkProvider.scala:14)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:475)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:473)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:472)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
org.apache.spark.sql.streaming.StreamingQueryException: Queries with streaming sources must be executed with writeStream.start();;
LogicalRDD [key#66, value#67, topic#68, partition#69, offset#70L, timestamp#71, timestampType#72], true

=== Streaming Query ===
Identifier: [id = 3f3688bb-6c3d-45bc-ab33-23968069abc0, runId = 0346659e-cb5f-4ee2-919a-00ca124e1e3e]
Current Committed Offsets: {}
Current Available Offsets: {KafkaSource[Subscribe[traffic]]: {"traffic":{"0":118641202}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
Project [data#25.guid AS guid#27, data#25.time AS time#28L, data#25.end_time AS end_time#29L, data#25.srcip AS srcip#30, data#25.srcmac AS srcmac#31, data#25.srcport AS srcport#32, data#25.destip AS destip#33, data#25.destmac AS destmac#34, data#25.destport AS destport#35, data#25.proto AS proto#36, data#25.appproto AS appproto#37, data#25.upsize AS upsize#38L, data#25.downsize AS downsize#39L]
+- Project [jsontostructs(StructField(guid,StringType,true), StructField(time,LongType,true), StructField(end_time,LongType,true), StructField(srcip,StringType,true), StructField(srcmac,StringType,true), StructField(srcport,IntegerType,true), StructField(destip,StringType,true), StructField(destmac,StringType,true), StructField(destport,IntegerType,true), StructField(proto,StringType,true), StructField(appproto,StringType,true), StructField(upsize,LongType,true), StructField(downsize,LongType,true), cast(value#22 as string), Some(Asia/Shanghai), true) AS data#25]
   +- Project [cast(topic#9 as string) AS topic#21, cast(value#8 as string) AS value#22]
      +- StreamingExecutionRelation KafkaSource[Subscribe[traffic]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]

	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
LogicalRDD [key#66, value#67, topic#68, partition#69, offset#70L, timestamp#71, timestampType#72], true

	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)
	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:37)
	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:35)
	at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
	at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
	at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2980)
	at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2978)
	at HBase.HBaseSink.addBatch(HBaseSinkProvider.scala:14)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:475)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:473)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:472)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
	... 1 more

@sutugin
Copy link

sutugin commented Nov 21, 2019

Hi @631068264, if you build from my fork, try specifying the format: "org.apache.spark.sql.execution.streaming.HBaseStreamSinkProvider" or "hbase"

@Saimukunth
Copy link

Hi,

I'm doing a structured spark streaming of the kafka ingested messages and storing the data in hbase post processing. The issue that is popping up is,

**ERROR ConnectionManager$HConnectionImplementation: The node /hbase is not in ZooKeeper. It should have been written by the master. Check the valu

e configured in 'zookeeper.znode.parent'. There could be a mismatch with the one configured in the master.**

I tried passing the hbase-site.xml in the spark-submit, but no luck. The hbase-site.xml has the property "zookeeper.znode.parent", which is "/hbase-unsecure"

My spark-submit parameters are,
spark-submit
--class CountryCountStreaming
--master yarn-client
--conf spark.ui.port=4926
--jars $(echo /home/venkateshramanpc5546/external_jars/*.jar | tr ' ' ',')
--packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11
--repositories http://repo.hortonworks.com/content/groups/public/
--files /usr/hdp/current/hbase-client/conf/hbase-site.xml
kafkasparkstreamingdemo_2.11-0.1.jar cloudxlab /tmp/venkatesh/retail_schema/Retail_Logs.json /tmp/venkatesh/retail_checkpoint

The stack version in the cluster is given below:

Hadoop 2.7.3
HBase 1.1.2
Zookeeper 3.4.6
Kafka 0.10.1
Spark 2.1.1

Please find the build.sbt and the scala classes attached for your reference.

Kindly let me know if there is any hbase configuration (zookeeper quorum, zookeeper clientport, zookeeper znode parent) which we can set in the step where we are writing data to a table, which is,

df.write.
options(Map(HBaseTableCatalog.tableCatalog -> hBaseCatalog,
HBaseTableCatalog.newTable -> "4")).
format(defaultFormat).
save()

StructuredStreaming.zip

@sutugin
Copy link

sutugin commented Apr 24, 2020

Hi, @Saimukunth!
If you write in batch mode, not structured streaming, then the error is reproduced?
I think the problem is the same as #150

@Saimukunth
Copy link

Saimukunth commented Apr 24, 2020 via email

@sutugin
Copy link

sutugin commented Apr 24, 2020

@Saimukunth Try to do as described here (#150 (comment)) in the second case

@Saimukunth
Copy link

Hi,

Thanks, I was able to resolve the above issue using shc jar. Data is getting inserted into hbase, but not in a way I wanted.

**Expected:-
2018-08-27T01:57:00.000Z column=metrics:Algeria, timestamp=1535335088874, value=1
2018-08-27T01:57:00.000Z column=metrics:Brazil, timestamp=1535335089093, value=1
2018-08-27T01:57:00.000Z column=metrics:Canada, timestamp=1535335088664, value=1
2018-08-27T01:57:00.000Z column=metrics:China, timestamp=1535335089345, value=3
2018-08-27T01:57:00.000Z column=metrics:Czechia, timestamp=1535335088651, value=1
2018-08-27T01:57:00.000Z column=metrics:Hong Kong, timestamp=1535335089496, value=1

Actual:-
2020-05-06T10:52:00.000Z column=metrics:countryCount, timestamp=1588762512336, value=1
2020-05-06T10:52:00.000Z column=metrics:countryName, timestamp=1588762512336, value=United Kingdom
2020-05-06T10:53:00.000Z column=metrics:countryCount, timestamp=1588762508359, value=3
2020-05-06T10:53:00.000Z column=metrics:countryName, timestamp=1588762508359, value=Brazil**

This is my hbase catalog file,
def catalog =
s"""{
|"table":{"namespace":"default", "name":"country_count"},
|"rowkey":"window",
|"columns":{
|"window":{"cf":"rowkey", "col":"window", "type":"string"},
|"countryName":{"cf":"metrics", "col":"countryName", "type":"string"},
|"countryCount":{"cf":"metrics", "col":"countryCount", "type":"int"}
|}
|}""".stripMargin
val getCountryCountDF: DataFrame = spark.sql("select countryName, " +
"cast(rounded_timestamp as string) as window, count(1) as countryCount from retail " +
"group by countryName, cast(rounded_timestamp as string)").
selectExpr( "window", "countryName", "CAST(countryCount as STRING)")
val finalDF = getCountryCountDF.
writeStream.
queryName("Retail Logs Writer").
format("HBase.HBaseStreamSinkProvider").
option("hbasecatalog", catalog).
option("checkpointLocation", args(2)).
outputMode("update").
trigger(Trigger.ProcessingTime("20 seconds")).
start

In the HBaseStreamSinkProvider, I'm writing to hbase using,

**val schema = data.schema
val res: RDD[Row] = data.queryExecution.toRdd.mapPartitions { rows =>
  val converter = CatalystTypeConverters.createToScalaConverter(schema)
  rows.map(converter(_).asInstanceOf[Row])
}
val df = data.sparkSession.createDataFrame(res, schema)
df.write.
  options(Map(HBaseTableCatalog.tableCatalog -> hBaseCatalog,
    HBaseTableCatalog.newTable -> "4")).
  format(defaultFormat).
  save()**

Is there any way I can play around the HBaseTableCatalog class to get the desired result.

Thank and Regards,
Venkatesh Raman

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests