Skip to content

Commit

Permalink
First commit
Browse files Browse the repository at this point in the history
  • Loading branch information
gnschenker committed Oct 23, 2018
0 parents commit 34bc619
Show file tree
Hide file tree
Showing 10 changed files with 348 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
build/
**/.DS_Store
.gradle
33 changes: 33 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
= Kafka Essentials

This repository contains the sample code discussed and used in the Confluent course *Kafka Technical Essentials*.

== Build Java Examples

1. From within the folder containing the `build.gradle` file run the following command:
+
[cols="50,50",grid="none",frame="none"]
|===
a|
Bash

[source,subs="verbatim,quotes"]
----
$ *docker container run --rm \
-v ${PWD}:/home/gradle/project \
-v ${HOME}/.gradle:/root/.gradle \
-w /home/gradle/project \
frekele/gradle:latest gradle build*
----
a|
Powershell

[source,subs="verbatim,quotes"]
----
PS> *docker container run --rm `
-v ${PWD}:/home/gradle/project `
-v ${HOME}/.gradle:/root/.gradle `
-w /home/gradle/project `
frekele/gradle:latest gradle build*
----
|===
55 changes: 55 additions & 0 deletions m-01/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
version: '3.5'
services:
zookeeper:
image: "confluentinc/cp-zookeeper:5.0.0"
networks:
- kafka-net
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: "confluentinc/cp-enterprise-kafka:5.0.0"
hostname: kafka
networks:
- kafka-net
environment:
KAFKA_BROKER_ID: 101
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_DELETE_TOPIC_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_METRIC_REPORTERS: "io.confluent.metrics.reporter.ConfluentMetricsReporter"
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: "kafka:9092"

schema-registry:
image: "confluentinc/cp-schema-registry:5.0.0"
networks:
- kafka-net
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
SCHEMA_REGISTRY_LISTENERS: http://schema-registry:8081

control-center:
image: confluentinc/cp-enterprise-control-center:5.0.0
restart: always
networks:
- kafka-net
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: "kafka:9092"
CONTROL_CENTER_ZOOKEEPER_CONNECT: "zookeeper:2181"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_REPLICATION: 1
CONTROL_CENTER_INTERNAL_TOPICS_REPLICATION: 1
CONTROL_CENTER_COMMAND_TOPIC_REPLICATION: 1
CONTROL_CENTER_METRICS_TOPIC_REPLICATION: 1
CONTROL_CENTER_STREAMS_NUM_STREAM_THREADS: 1
CONTROL_CENTER_STREAMS_CONSUMER_REQUEST_TIMEOUT_MS: "960032"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"

networks:
kafka-net:
36 changes: 36 additions & 0 deletions m-02/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
version: '3.5'
services:
zookeeper:
image: "confluentinc/cp-zookeeper:5.0.0"
networks:
- kafka-net
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: "confluentinc/cp-enterprise-kafka:5.0.0"
hostname: kafka
networks:
- kafka-net
environment:
HOSTNAME: kafka
KAFKA_BROKER_ID: 101
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_DELETE_TOPIC_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100

schema-registry:
image: "confluentinc/cp-schema-registry:5.0.0"
networks:
- kafka-net
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
SCHEMA_REGISTRY_LISTENERS: http://schema-registry:8081

networks:
kafka-net:
37 changes: 37 additions & 0 deletions m-03/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
version: '3.5'
services:
zookeeper:
image: "confluentinc/cp-zookeeper:5.0.0"
networks:
- kafka-net
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: "confluentinc/cp-enterprise-kafka:5.0.0"
hostname: kafka
networks:
- kafka-net
environment:
HOSTNAME: kafka
KAFKA_BROKER_ID: 101
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_DELETE_TOPIC_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100

schema-registry:
image: "confluentinc/cp-schema-registry:5.0.0"
hostname: schema-registry
networks:
- kafka-net
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
SCHEMA_REGISTRY_LISTENERS: http://schema-registry:8081

networks:
kafka-net:
38 changes: 38 additions & 0 deletions m-04/consumer/dotnet/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System;
using System.Linq;
using System.Text;
using System.Collections.Generic;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;

namespace consumer_net {
class Program {
static void Main (string[] args) {
Console.WriteLine ("Starting Consumer!");
var config = new Dictionary<string, object> {
{ "group.id", "dotnet-consumer-group" },
{ "bootstrap.servers", "kafka-1:9092" },
{ "auto.commit.interval.ms", 5000 },
{ "auto.offset.reset", "earliest" }
};

var deserializer = new StringDeserializer (Encoding.UTF8);
using (var consumer = new Consumer<string, string> (config, deserializer, deserializer)) {
consumer.OnMessage += (_, msg) =>
Console.WriteLine ($"Read ('{msg.Key}', '{msg.Value}') from: {msg.TopicPartitionOffset}");

consumer.OnError += (_, error) =>
Console.WriteLine ($"Error: {error}");

consumer.OnConsumeError += (_, msg) =>
Console.WriteLine ($"Consume error ({msg.TopicPartitionOffset}): {msg.Error}");

consumer.Subscribe ("hello_world_topic");

while (true) {
consumer.Poll (TimeSpan.FromMilliseconds (100));
}
}
}
}
}
58 changes: 58 additions & 0 deletions m-04/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
version: '3.5'
services:
zookeeper:
image: "confluentinc/cp-zookeeper:5.0.0"
networks:
- kafka-net
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: "confluentinc/cp-enterprise-kafka:5.0.0"
hostname: kafka
networks:
- kafka-net
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 101
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_DELETE_TOPIC_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
KAFKA_METRIC_REPORTERS: "io.confluent.metrics.reporter.ConfluentMetricsReporter"
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: "kafka:9092"
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1

schema-registry:
image: "confluentinc/cp-schema-registry:5.0.0"
networks:
- kafka-net
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
SCHEMA_REGISTRY_LISTENERS: http://schema-registry:8081

control-center:
image: confluentinc/cp-enterprise-control-center:5.0.0
networks:
- kafka-net
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: "kafka:9092"
CONTROL_CENTER_ZOOKEEPER_CONNECT: "zookeeper:2181"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_REPLICATION: 1
CONTROL_CENTER_INTERNAL_TOPICS_REPLICATION: 1
CONTROL_CENTER_COMMAND_TOPIC_REPLICATION: 1
CONTROL_CENTER_METRICS_TOPIC_REPLICATION: 1
CONTROL_CENTER_STREAMS_NUM_STREAM_THREADS: 1
CONTROL_CENTER_STREAMS_CONSUMER_REQUEST_TIMEOUT_MS: "960032"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"

networks:
kafka-net:
31 changes: 31 additions & 0 deletions m-04/producer/java/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
plugins {
id 'com.github.johnrengelman.shadow' version '2.0.4'
id 'java'
id 'application'
}

mainClassName = 'clients.BasicProducer'

repositories {
mavenCentral()
maven { url "http://packages.confluent.io/maven/" }
}

jar {
baseName = 'basic-producer'
}

shadowJar {
baseName = 'app'
classifier = null
version = null
}

sourceCompatibility = 1.8
targetCompatibility = 1.8

dependencies {
compile group: "org.apache.kafka", name: "kafka-clients", version: "2.0.0"
compile group: "org.slf4j", name: "slf4j-log4j12", version: "1.7.25"
compile group: "io.confluent", name: "monitoring-interceptors", version: "5.0.0"
}
48 changes: 48 additions & 0 deletions m-04/producer/java/src/main/java/clients/BasicProducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package clients;

import java.lang.InterruptedException;
import java.lang.Math;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class BasicProducer {
public static void main(String[] args) throws InterruptedException {
System.out.println("*** Starting Readings Producer ***");

Properties settings = new Properties();
settings.put("client.id", "readings-producer");
settings.put("bootstrap.servers", "kafka:9092");
settings.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
settings.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
settings.put("interceptor.classes", "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor");

final KafkaProducer<String, String> producer = new KafkaProducer<>(settings);

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("### Stopping Readings Producer ###");
producer.close();
}));

final String topic = "readings";
final int numberOfStations = 500;
final double min = -20.0;
final double max = 20.0;
final double[] avgTemp = new double[numberOfStations];
for(int i=0; i<numberOfStations; i++){
avgTemp[i] = (Math.random() * ((max - min) + 1)) + min;
}
while(true){
for(int i=1; i<=numberOfStations; i++){
final String key = "station-" + i;
final double temp = avgTemp[i-1] + ((Math.random()-0.5) * 20);
final String value = Double.toString(temp);
final ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producer.send(record);
}
final int interval = (int)Math.random() * 1000; // wait a fraction of a second
TimeUnit.MILLISECONDS.sleep(interval);
}
}
}
9 changes: 9 additions & 0 deletions m-04/producer/java/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@

# Root logger option
log4j.rootLogger=WARN, stdout

# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.err
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

0 comments on commit 34bc619

Please sign in to comment.