diff --git a/README.md b/README.md new file mode 100644 index 0000000..06bd9b2 --- /dev/null +++ b/README.md @@ -0,0 +1,26 @@ +# spark-streaming-kafka-demo +Demo Spark Streaming Consumer read data from kafka + +Enable Kafka and create 1 topic with name "topic-1": + +```bash +bin/kafka-topics.sh --create --topic topic-1 --bootstrap-server localhost:9092 +``` + +Create _producer_: + +```bash +bin/kafka-console-producer.sh --topic topic-1 --bootstrap-server localhost:9092 +``` + +Build project to file `.jar`: + +```bash +mvn clean package +``` + +Run file `.jar` in Spark with `spark-submit`: + +```bash +spark-submit --class demo.sparkstreaming.StreamingConsumer target/SparkStreamingDemo-V1-jar-with-dependencies.jar +``` diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..56eabd8 --- /dev/null +++ b/pom.xml @@ -0,0 +1,71 @@ + + 4.0.0 + SparkStreamingDemo + SparkStreamingDemo + V1 + + + + org.apache.kafka + kafka-clients + 2.7.0 + + + + org.apache.kafka + kafka_2.12 + 2.7.0 + + + + org.apache.spark + spark-streaming-kafka-0-10_2.12 + 3.1.1 + + + + org.apache.spark + spark-streaming_2.12 + 3.1.1 + provided + + + + src + + + maven-compiler-plugin + 3.8.1 + + 11 + + + + org.apache.maven.plugins + maven-assembly-plugin + + + package + + single + + + + + + demo.sparkstreaming.StreamingConsumer + + + + + jar-with-dependencies + + + + + + + + \ No newline at end of file diff --git a/src/demo/sparkstreaming/StreamingConsumer.java b/src/demo/sparkstreaming/StreamingConsumer.java new file mode 100644 index 0000000..0f10d52 --- /dev/null +++ b/src/demo/sparkstreaming/StreamingConsumer.java @@ -0,0 +1,48 @@ +package demo.sparkstreaming; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.streaming.Durations; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kafka010.ConsumerStrategies; +import org.apache.spark.streaming.kafka010.KafkaUtils; +import org.apache.spark.streaming.kafka010.LocationStrategies; + +import demo.sparkstreaming.properties.SSKafkaProperties; + +public class StreamingConsumer { + public static void main(String[] args) throws InterruptedException { + SparkConf conf = new SparkConf().setMaster("local").setAppName("Spark Streaming Consumer"); + JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); + + // Define a list of Kafka topic to subscribe + Collection topics = Arrays.asList("topic-1"); + + // Create an input DStream which consume message from Kafka topics + JavaInputDStream> stream; + stream = KafkaUtils.createDirectStream(jssc, + LocationStrategies.PreferConsistent(), + ConsumerStrategies.Subscribe(topics, SSKafkaProperties.getInstance())); + + JavaDStream lines = stream.map((Function, String>) kafkaRecord -> kafkaRecord.value()); + lines.cache().foreachRDD(line -> { + List list = line.collect(); + if(line != null) { + for(String l: list) { + System.out.println(l); + } + } + }); + + // Start the computation + jssc.start(); + jssc.awaitTermination(); + } +} diff --git a/src/demo/sparkstreaming/properties/SSKafkaProperties.java b/src/demo/sparkstreaming/properties/SSKafkaProperties.java new file mode 100644 index 0000000..ede197d --- /dev/null +++ b/src/demo/sparkstreaming/properties/SSKafkaProperties.java @@ -0,0 +1,24 @@ +package demo.sparkstreaming.properties; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.common.serialization.StringDeserializer; + +public class SSKafkaProperties { + /** + * Define configuration kafka consumer with spark streaming + * @return + */ + public static Map getInstance() { + Map kafkaParams = new HashMap(); + kafkaParams.put("bootstrap.servers", "localhost:9092"); + kafkaParams.put("key.deserializer", StringDeserializer.class); + kafkaParams.put("value.deserializer", StringDeserializer.class); + kafkaParams.put("group.id", "0"); + kafkaParams.put("auto.offset.reset", "earliest"); + kafkaParams.put("enable.auto.commit", false); + + return kafkaParams; + } +}