Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Comatibility with spark 2.3.0 problems #13

Open
fhuertas opened this issue Jun 5, 2018 · 7 comments
Open

Comatibility with spark 2.3.0 problems #13

fhuertas opened this issue Jun 5, 2018 · 7 comments

Comments

@fhuertas
Copy link
Owner

fhuertas commented Jun 5, 2018

This method is not valid for spark 2.3.0 because addBatch function now has a streaming dataset that cannot be stored with official batch connector

@dannybusch
Copy link

dannybusch commented Jun 18, 2018

Is there already any new hint on how to write a Spark Cassandra Sink for Spark 2.3.x in Structured Streaming?

@fhuertas
Copy link
Owner Author

fhuertas commented Jun 18, 2018 via email

@fhuertas
Copy link
Owner Author

Confirmed that Spark 2.3.1 keeps the dataframe as streaming. Then this solution is not valid.

When I have time, I will try to implement with other way, maybe using foreach

@redsk
Copy link

redsk commented Aug 23, 2018

Hi @fhuertas , do you have any news on this?

@gitsparky
Copy link

The idea from a PR for Hadoop seems to work, see:
hortonworks-spark/shc#238

@fhuertas
Copy link
Owner Author

fhuertas commented Nov 2, 2018

This weekend I'm going to try to adapt this code and test if it works.

 override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {
    // use a local variable to make sure the map closure doesn't capture the whole DataFrame
    val schema = data.schema
    val res = data.queryExecution.toRdd.mapPartitions { rows =>
        val converter = CatalystTypeConverters.createToScalaConverter(schema)
        rows.map(converter(_).asInstanceOf[Row])
      }

    val df = sqlContext.sparkSession.createDataFrame(res,schema)
    df.write
      .options(specifiedHBaseParams)
      .format(defaultFormat)
      .save()
  }

It looks good and could be a good solution

@fhuertas
Copy link
Owner Author

fhuertas commented Nov 6, 2018

I have done a tests that works with 2.3.0/1/2 and it looks good, I will upload this afternoon a branch with the preliminary version.

It is the same idea that proposed in the issue: hortonworks-spark/shc#238

Thanks @gitsparky for the link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants