From 2827bed5fadfb44022dc560620aab6640ae7bfff Mon Sep 17 00:00:00 2001 From: Egor Krivokon <33221486+ekrivokonmapr@users.noreply.github.com> Date: Mon, 7 Oct 2019 12:20:43 +0300 Subject: [PATCH] MapR [SPARK-619] Move absent commits from 2.4.3 branch to 2.4.4 (#574) * Adding SQL API to write to kafka from Spark (#567) * Branch 2.4.3 extended kafka and examples (#569) * The v2 API is in its own package - the v2 api is in a different package - the old functionality is available in a separated package * v2 API examples - All the examples are using the newest API. - I have removed the old examples since they are not relevant any more and the same functionality is shown in the new examples usin the new API. * MapR [SPARK-619] Move absent commits from 2.4.3 branch to 2.4.4 --- build/dev-build.sh | 2 +- .../streaming/KafkaProducerExample.scala | 97 ++++++++++--------- external/kafka-producer/pom.xml | 5 + .../kafka/producer/RDDFunctions.scala | 3 +- .../streaming/kafka/producer/package.scala | 6 +- .../kafka/producer/sql/CommittedIds.scala | 5 + .../producer/sql/KafkaDataSourceWriter.scala | 38 ++++++++ .../producer/sql/KafkaDataWriterFactory.scala | 74 ++++++++++++++ .../kafka/producer/sql/KafkaWriter.scala | 18 ++++ .../streaming/kafka/v2/producer/package.scala | 95 ++++++++++++++++++ pom.xml | 2 + 11 files changed, 294 insertions(+), 51 deletions(-) create mode 100644 external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/sql/CommittedIds.scala create mode 100644 external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/sql/KafkaDataSourceWriter.scala create mode 100644 external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/sql/KafkaDataWriterFactory.scala create mode 100644 external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/sql/KafkaWriter.scala create mode 100644 external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/v2/producer/package.scala diff --git a/build/dev-build.sh b/build/dev-build.sh index a37e4a04e4c95..48c934c4ff23a 100755 --- a/build/dev-build.sh +++ b/build/dev-build.sh @@ -12,5 +12,5 @@ fi if [ $? -ne 0 ]; then exit 1; fi -scp -r assembly/target/scala-2.11/jars mapr@node1:/opt/mapr/spark/spark-2.0.1/jars +scp -r assembly/target/scala-2.11/jars mapr@node1:/opt/mapr/spark/spark-2.4.4/jars if [ $? -ne 0 ]; then exit 1; fi \ No newline at end of file diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaProducerExample.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaProducerExample.scala index 6960655a432ee..d38de66b840e7 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaProducerExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaProducerExample.scala @@ -17,48 +17,24 @@ package org.apache.spark.examples.streaming -import java.util.{ Map => JMap } +import java.util.{Map => JMap} import org.apache.kafka.common.serialization.Serializer - import org.apache.spark.SparkConf + +import scala.util.Random import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{ConstantInputDStream, DStream} +import org.apache.spark.streaming.kafka.v2.producer._ +import org.apache.spark.sql.{DataFrame, SparkSession, Row} +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType, LongType} -class ItemJsonSerializer extends Serializer[Item] { - override def configure(configs: JMap[String, _], isKey: Boolean): Unit = { /* NOP */ } - - override def serialize(topic: String, data: Item): Array[Byte] = data.toString.getBytes - - override def close(): Unit = { /* NOP */ } -} - -case class Item(id: Int, value: Int) { - override def toString: String = s"""{"id":"$id","value":"$value"}""" -} - -/** - * Produces messages to Kafka. - * Usage: KafkaProducerExample - * is a list of one or more kafka brokers - * is a list of one or more kafka topics - * is the number of messages that the kafka producer should send - * - * Example: - * `$ bin/run-example \ - * org.apache.spark.examples.streaming.KafkaProducerExample broker1,broker2 \ - * topic1,topic2 10` - */ - -// scalastyle:off println -object KafkaProducerExample extends App { - import org.apache.spark.streaming.kafka.producer._ +object KakfaProducerExample extends App { - if (args.length < 3) { + if (args.length < 2) { System.err.println(s""" - |Usage: Usage: KafkaProducerExample - | is a list of one or more kafka brokers + |Usage: Usage: KafkaProducerExample | is a list of one or more kafka topics | is the number of messages that the kafka producer | should send @@ -66,29 +42,58 @@ object KafkaProducerExample extends App { System.exit(1) } - val Array(kafkaBrokers, topics, numMessages) = args - - val batchTime = Seconds(2) + val Array(topics, numMessages) = args val sparkConf = new SparkConf() .set("spark.executor.memory", "1g") .set("spark.driver.memory", "1g") .setAppName(getClass.getCanonicalName) + + implicit val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() + + val items = (0 until numMessages.toInt).map(_.toString) + + val stringRDD: RDD[String] = sparkSession.sparkContext.parallelize(items) + + // if we have RDD[String] we can write to kafka using the new API V2 + + stringRDD.sendToKafka(topics) + + val rnd = new Random() + + // create RDD of Rows + val anotherRDD = stringRDD.map(s => Row(s, s.length, rnd.nextLong())) + + val schema = new StructType() + .add(StructField("value", StringType)) + .add(StructField("length", IntegerType)) + .add(StructField("some_long", LongType)) + + // create a dataframe with some schema + val dataFrame: DataFrame = sparkSession.createDataFrame(anotherRDD, schema) + + // any data frame can be easily written to Kafka + dataFrame.sendToKafka(topics) + + val intRDD: RDD[(Int, Int)] = sparkSession.sparkContext.parallelize(0 until numMessages.toInt).map(n => (n, n.toString.length)) + + val transformer = (v: (Int, Int)) => Row(v._1, v._2) + + // given an RDD[A], a function A => Row and a schema, we can write to kafka easily + intRDD.sendToKafka(topics, transformer, new StructType().add(StructField("value", IntegerType)).add(StructField("length", IntegerType))) + + val batchTime = Seconds(2) val ssc = new StreamingContext(sparkConf, batchTime) - val producerConf = new ProducerConf(bootstrapServers = kafkaBrokers.split(",").toList) - .withKeySerializer("org.apache.kafka.common.serialization.ByteArraySerializer") - .withValueSerializer("org.apache.kafka.common.serialization.StringSerializer") + val stringStream: DStream[String] = new ConstantInputDStream[String](ssc, stringRDD) + + stringStream.sendToKafka(topics) - val items = (0 until numMessages.toInt).map(i => Item(i, i).toString) - val defaultRDD: RDD[String] = ssc.sparkContext.parallelize(items) - val dStream: DStream[String] = new ConstantInputDStream[String](ssc, defaultRDD) + val someStream = new ConstantInputDStream[(Int, Int)](ssc, intRDD) - dStream.foreachRDD(_.sendToKafka(topics, producerConf)) - dStream.count().print() + someStream.sendToKafka(topics, transformer, new StructType().add(StructField("value", IntegerType)).add(StructField("length", IntegerType))) ssc.start() ssc.awaitTermination() - ssc.stop(stopSparkContext = true, stopGracefully = true) -} +} \ No newline at end of file diff --git a/external/kafka-producer/pom.xml b/external/kafka-producer/pom.xml index bdd633ab1b699..5382012573f4b 100644 --- a/external/kafka-producer/pom.xml +++ b/external/kafka-producer/pom.xml @@ -64,6 +64,11 @@ test-jar test + + org.apache.spark + spark-sql_${scala.binary.version} + ${project.version} + org.apache.kafka kafka_${scala.binary.version} diff --git a/external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/RDDFunctions.scala b/external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/RDDFunctions.scala index 595fd650d54d0..eb9d0f766ac3c 100644 --- a/external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/RDDFunctions.scala +++ b/external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/RDDFunctions.scala @@ -22,6 +22,7 @@ import scala.language.implicitConversions import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, SparkSession} class RDDFunctions[T](rdd: RDD[T]) { def sendToKafka(topic: String, conf: ProducerConf): Unit = { @@ -43,4 +44,4 @@ class PairRDDFunctions[K, V](rdd: RDD[(K, V)]) { } }) } -} +} \ No newline at end of file diff --git a/external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/package.scala b/external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/package.scala index ba2e7619a375d..b1c1ab367456e 100644 --- a/external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/package.scala +++ b/external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/package.scala @@ -18,14 +18,14 @@ package org.apache.spark.streaming.kafka import scala.language.implicitConversions - import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, SparkSession, Row} +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} package object producer { implicit def toRDDFunctions[T](rdd: RDD[T]): RDDFunctions[T] = new RDDFunctions[T](rdd) implicit def toPairRDDFunctions[K, V](rdd: RDD[(K, V)]): - PairRDDFunctions[K, V] = new PairRDDFunctions[K, V](rdd) - + PairRDDFunctions[K, V] = new PairRDDFunctions[K, V](rdd) } diff --git a/external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/sql/CommittedIds.scala b/external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/sql/CommittedIds.scala new file mode 100644 index 0000000000000..7cc738ba6e730 --- /dev/null +++ b/external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/sql/CommittedIds.scala @@ -0,0 +1,5 @@ +package org.apache.spark.streaming.kafka.producer.sql + +import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage + +private case class CommittedIds(partitionId: Int, ids: Set[String]) extends WriterCommitMessage \ No newline at end of file diff --git a/external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/sql/KafkaDataSourceWriter.scala b/external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/sql/KafkaDataSourceWriter.scala new file mode 100644 index 0000000000000..f0ce5456f6ae6 --- /dev/null +++ b/external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/sql/KafkaDataSourceWriter.scala @@ -0,0 +1,38 @@ +package org.apache.spark.streaming.kafka.producer.sql + +import java.util.concurrent.Future + +import org.apache.spark.streaming.kafka.producer.sql.CommittedIds +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriter, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.sql.types.{DataType, StringType, StructType} +import org.apache.spark.streaming.kafka.producer.ProducerConf +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} + + +private class KafkaDataSourceWriter(topic: String, schema: StructType) extends DataSourceWriter with Logging { + + private var globallyCommittedIds = List.empty[String] + + override def createWriterFactory(): DataWriterFactory[InternalRow] = new KafkaDataWriterFactory(topic, schema) + + override def commit(messages: Array[WriterCommitMessage]): Unit = { + + val ids = messages.foldLeft(Set.empty[String]) { case (acc, CommittedIds(partitionId, partitionIds)) => + log.info(s"PARTITION $partitionId HAS BEEN CONFIRMED BY DRIVER") + + acc ++ partitionIds + } + + // Let's make sure this is thread-safe + globallyCommittedIds = this.synchronized { + globallyCommittedIds ++ ids + } + } + + override def abort(messages: Array[WriterCommitMessage]): Unit = { + log.info("JOB BEING ABORTED") + } +} \ No newline at end of file diff --git a/external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/sql/KafkaDataWriterFactory.scala b/external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/sql/KafkaDataWriterFactory.scala new file mode 100644 index 0000000000000..6883250b256ea --- /dev/null +++ b/external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/sql/KafkaDataWriterFactory.scala @@ -0,0 +1,74 @@ +package org.apache.spark.streaming.kafka.producer.sql + +import java.util.concurrent.Future +import java.util.concurrent.Future + +import org.apache.spark.streaming.kafka.producer.sql.CommittedIds +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriter, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.sql.types.{DataType, StringType, StructType} +import org.apache.spark.streaming.kafka.producer.ProducerConf +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} + +import scala.util.parsing.json.{JSONArray, JSONObject} + +private class KafkaDataWriterFactory(topic: String, schema: StructType) extends DataWriterFactory[InternalRow] { + + @transient private lazy val producerConf = new ProducerConf( + bootstrapServers = "".split(",").toList) + + @transient private lazy val producer = new KafkaProducer[String, String](producerConf.asJMap()) + + override def createDataWriter(partitionId: Int, taskId: Long, epochId: Long): DataWriter[InternalRow] = new DataWriter[InternalRow] with Logging { + + private val writtenIds = scala.collection.mutable.ListBuffer.empty[Future[RecordMetadata]] + + log.info(s"PROCESSING PARTITION ID: $partitionId ; TASK ID: $taskId") + + override def write(record: InternalRow): Unit = { + val data = record.toSeq(schema).toList + + val map = schema.fields.zipWithIndex + .map { case (field, idx) => (field.name, data(idx)) } + .toMap + + val json = toJson(map) + + val task = producer.send(new ProducerRecord(topic, json.toString)) + + writtenIds.append(task) + + } + + + override def commit(): WriterCommitMessage = { + val meta = writtenIds.map(_.get()) + + writtenIds.clear() + CommittedIds(partitionId, meta.map(_.offset().toString).toSet) + } + + override def abort(): Unit = writtenIds.map(_.cancel(true)) + + private def toJson(arr: List[Any]): JSONArray = { + JSONArray(arr.map { + case (innerMap: Map[String, Any]) => toJson(innerMap) + case (innerArray: List[Any]) => toJson(innerArray) + case (other) => other + }) + } + + private def toJson(map: Map[String, Any]): JSONObject = { + JSONObject(map.map { + case (key, innerMap: Map[String, Any]) => + (key, toJson(innerMap)) + case (key, innerArray: List[Any]) => + (key, toJson(innerArray)) + case (key, other) => + (key, other) + }) + } + } +} diff --git a/external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/sql/KafkaWriter.scala b/external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/sql/KafkaWriter.scala new file mode 100644 index 0000000000000..d8db21bc14fd7 --- /dev/null +++ b/external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/producer/sql/KafkaWriter.scala @@ -0,0 +1,18 @@ +package org.apache.spark.streaming.kafka.producer.sql + +import java.util.Optional + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.sources.v2.{DataSourceOptions, WriteSupport} +import org.apache.spark.sql.types.StructType + +class KafkaWriter extends WriteSupport with Logging { + override def createWriter(writeUUID: String, schema: StructType, mode: SaveMode, options: DataSourceOptions): Optional[DataSourceWriter] = { + + val stream = options.get("path").get() + + java.util.Optional.of(new KafkaDataSourceWriter(stream, schema)) + } +} diff --git a/external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/v2/producer/package.scala b/external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/v2/producer/package.scala new file mode 100644 index 0000000000000..146fc4b6be3f9 --- /dev/null +++ b/external/kafka-producer/src/main/scala/org/apache/spark/streaming/kafka/v2/producer/package.scala @@ -0,0 +1,95 @@ +package org.apache.spark.streaming.kafka.v2 + +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.sql.{DataFrame, SparkSession, Row} +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} + +package object producer { + + /** + * Writes data frame of String into MapR-ES. + * + * @param dataFrame data to be written + * @param sparkSession Spark Session + */ + implicit class DataFrameOps(dataFrame: DataFrame)(implicit sparkSession: SparkSession) { + def sendToKafka(topic: String) = { + dataFrame + .write + .format("org.apache.spark.streaming.kafka.producer.sql.KafkaWriter") + .save(topic) + } + } + + /** + * Writes RDD[String] into MapR-ES. + * + * Notice that the JSON written to MapR-ES will be {"data": ""} + * + * @param rdd data to be written + * @param sparkSession Spark Session + */ + implicit class StringRDDOps(rdd: RDD[String])(implicit sparkSession: SparkSession) { + def sendToKafka(topic: String): Unit = + toDFAndSendToKafka(topic, rdd.map(Row(_)), new StructType().add("data", StringType)) + } + + + /** + * Writes RDD[A] into MapR-ES. + * + * @param rdd data to be written + * @param sparkSession Spark Session + * @tparam A + */ + implicit class RDDOps[A](rdd: RDD[A])(implicit sparkSession: SparkSession) { + + /** + * Writes RDD[A] into MapR-ES. + * + * @param topic Kafka topic to write to + * @param fn a function to transform each data item in the RDD into a Row + * @param schema schema of each Row + */ + def sendToKafka(topic: String, fn: A => Row, schema: StructType): Unit = + toDFAndSendToKafka(topic, rdd.map(a => fn(a)), schema) + } + + /** + * Writes DStream[A] into MapR-ES. + * + * @param stream data to be written + * @param sparkSession Spark Session + */ + implicit class StringDStreamOps(stream: DStream[String])(implicit sparkSession: SparkSession) { + def sendToKafka(topic: String): Unit = stream.foreachRDD(_.sendToKafka(topic)) + } + + /** + * Writes DStream[A] into MapR-ES. + * + * @param stream data to be written + * @param sparkSession Spark Session + */ + implicit class DStreamOps[A](stream: DStream[A])(implicit sparkSession: SparkSession) { + + /** + * Writes DStream[A] into MapR-ES. + * + * @param topic Kafka topic to write to + * @param fn a function to transform each data item in the RDD into a Row + * @param schema schema of each Row + */ + def sendToKafka(topic: String, fn: A => Row, schema: StructType): Unit = + stream + .map(a => fn(a)) + .foreachRDD(rdd => toDFAndSendToKafka(topic, rdd, schema)) + } + + private def toDFAndSendToKafka(topic: String, rdd: RDD[Row], schema: StructType)(implicit sparkSession: SparkSession): Unit = + sparkSession + .createDataFrame(rdd, schema) + .sendToKafka(topic) + +} diff --git a/pom.xml b/pom.xml index a614714c1e6a1..263d73e58f780 100644 --- a/pom.xml +++ b/pom.xml @@ -2148,6 +2148,8 @@ -feature -explaintypes -Yno-adapted-args + -Xmax-classfile-name + 128 -Xms1024m