Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
trannguyenhan committed Jan 17, 2022
0 parents commit 47af24d
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 0 deletions.
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# spark-streaming-kafka-demo
Demo Spark Streaming Consumer read data from kafka

Enable Kafka and create 1 topic with name "topic-1":

```bash
bin/kafka-topics.sh --create --topic topic-1 --bootstrap-server localhost:9092
```

Create _producer_:

```bash
bin/kafka-console-producer.sh --topic topic-1 --bootstrap-server localhost:9092
```

Build project to file `.jar`:

```bash
mvn clean package
```

Run file `.jar` in Spark with `spark-submit`:

```bash
spark-submit --class demo.sparkstreaming.StreamingConsumer target/SparkStreamingDemo-V1-jar-with-dependencies.jar
```
71 changes: 71 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>SparkStreamingDemo</groupId>
<artifactId>SparkStreamingDemo</artifactId>
<version>V1</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.7.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.1.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<sourceDirectory>src</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<release>11</release>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<archive>
<manifest>
<mainClass>
demo.sparkstreaming.StreamingConsumer
</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
48 changes: 48 additions & 0 deletions src/demo/sparkstreaming/StreamingConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package demo.sparkstreaming;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import demo.sparkstreaming.properties.SSKafkaProperties;

public class StreamingConsumer {
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setMaster("local").setAppName("Spark Streaming Consumer");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

// Define a list of Kafka topic to subscribe
Collection<String> topics = Arrays.asList("topic-1");

// Create an input DStream which consume message from Kafka topics
JavaInputDStream<ConsumerRecord<String, String>> stream;
stream = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, SSKafkaProperties.getInstance()));

JavaDStream<String> lines = stream.map((Function<ConsumerRecord<String,String>, String>) kafkaRecord -> kafkaRecord.value());
lines.cache().foreachRDD(line -> {
List<String> list = line.collect();
if(line != null) {
for(String l: list) {
System.out.println(l);
}
}
});

// Start the computation
jssc.start();
jssc.awaitTermination();
}
}
24 changes: 24 additions & 0 deletions src/demo/sparkstreaming/properties/SSKafkaProperties.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package demo.sparkstreaming.properties;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.common.serialization.StringDeserializer;

public class SSKafkaProperties {
/**
* Define configuration kafka consumer with spark streaming
* @return
*/
public static Map<String, Object> getInstance() {
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "0");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);

return kafkaParams;
}
}

0 comments on commit 47af24d

Please sign in to comment.