diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..754fa33 --- /dev/null +++ b/.gitignore @@ -0,0 +1,32 @@ +*.class + +# Intellij +.idea/ +*.iml +*.iws +*.eml +out/ + +# Eclipse +*.pydevproject +.metadata +bin/ +tmp/ +*.tmp +*.bak +*.swp +*~.nib +local.properties +.settings/ +.loadpath +.classpath +.project +RemoteSystemsTempFiles/ +.externalToolBuilders/ +*.launch +.cproject +.buildpath +*.userlibraries + +# Grunt +node_modules \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..699da11 --- /dev/null +++ b/README.md @@ -0,0 +1,54 @@ +Fixed and updated sample implementations from the book "Apache Kafka" +===================================================================== +* Updated to Apache Kafka 0.8.1.1 +* Configuration optimized for usage on Windows machines +* Windows batch scripts fixed (taken from https://github.com/HCanber/kafka by @HCanber) +* Code examples repaired and refactored + +Initial Setup +------------- +1. [Download and install Apache Kafka](http://kafka.apache.org/downloads.html) 0.8.1.1 (I used the recommended [Scala 2.9.2 binary](https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz)) +2. Copy the scripts from [/bat](/bat) into `/bin/windows` of your Kafka installation folder (overwrite existing scripts) +3. Copy the property files from [/config](/config) into `/config` of your Kafka installation folder (overwrite existing files) + +Simple Java Producer (Chapter 5, Page 35ff.) +-------------------------------------------- +1. Open command line in your Kafka installation folder +2. Launch Zookeeper with `.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties` +3. Open a second command line in your Kafka installation folder +4. Launch single Kafka broker: `.\bin\windows\kafka-server-start.bat .\config\server.properties` +5. Open a third command line in your Kafka installation folder +6. Create a topic: `.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test` +7. Start a console consumer for that topic: `.\bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning` +8. From a fourth command line or your IDE run [SimpleProducer](/src/test/kafka/SimpleProducer.java) with topic and message as arguments: `java SimpleProducer test HelloKafka` +9. The message _HelloKafka_ should appear in the console consumer's log + +Java Producer with Message Partitioning (Chapter 5, Page 37ff.) +--------------------------------------------------------------- +1. Open command line in your Kafka installation folder +2. Launch Zookeeper with `.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties` +3. Open a second command line in your Kafka installation folder +4. Launch first Kafka broker: `.\bin\windows\kafka-server-start.bat .\config\server-1.properties` +5. Open a third command line in your Kafka installation folder +4. Launch second Kafka broker: `.\bin\windows\kafka-server-start.bat .\config\server-2.properties` +5. Open a fourth command line in your Kafka installation folder +6. Create a topic: `.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test` +7. Start a console consumer for that topic: `.\bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning` +8. From a fifth command line or your IDE run [MultiBrokerProducer](/src/test/kafka/MultiBrokerProducer.java) with topic as argument: `java MultiBrokerProducer test` +9. Ten messages starting with _This message is for key (...)_ should appear in the console consumer's log + +Simple High Level Java Consumer (Chapter 6, Page 47ff.) +------------------------------------------------------- +1. Launch multi-broker Kafka cluster and create topic `test` as described in step 1-6 of __Java Producer with Message Partitioning__ +2. From another command line or your IDE run [SimpleHLConsumer](/src/test/kafka/consumer/SimpleHLConsumer.java) with topic as argument: `java SimpleHLConsumer test` +3. From another command line or your IDE run [MultiBrokerProducer](/src/test/kafka/MultiBrokerProducer.java) with same topic as argument: `java MultiBrokerProducer test` +4. Ten messages starting with _This message is for key (...)_ should appear in the log of the __SimpleHLConsumer__ + +Multithreaded Consumer for Multipartition Topics (Chapter 6, Page 50ff.) +------------------------------------------------------------------------ +1. Launch multi-broker Kafka cluster and create topic `test` as described in step 1-6 of __Java Producer with Message Partitioning__ +2. From another command line or your IDE run [MultiThreadHLConsumer](/src/test/kafka/consumer/MultiThreadHLConsumer.java) with topic and number of threads as argument: `java MultiThreadHLConsumer test 4` +3. From another command line or your IDE run [MultiBrokerProducer](/src/test/kafka/MultiBrokerProducer.java) with same topic as argument: `java MultiBrokerProducer test` +4. Ten messages starting with _Message from thread (...)_ should appear in the log of the __MultiThreadHLConsumer__ + +[![endorse](https://api.coderwall.com/bkimminich/endorsecount.png)](https://coderwall.com/bkimminich) \ No newline at end of file diff --git a/bat/README.md b/bat/README.md new file mode 100644 index 0000000..253c3a9 --- /dev/null +++ b/bat/README.md @@ -0,0 +1 @@ +Fixed Windows batch scripts for Kafka 0.8.1 from https://github.com/HCanber/kafka \ No newline at end of file diff --git a/bat/kafka-console-consumer.bat b/bat/kafka-console-consumer.bat new file mode 100644 index 0000000..94b20a4 --- /dev/null +++ b/bat/kafka-console-consumer.bat @@ -0,0 +1,20 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +SetLocal +set KAFKA_HEAP_OPTS=-Xmx512M +%~dp0kafka-run-class.bat kafka.consumer.ConsoleConsumer %* +EndLocal diff --git a/bat/kafka-console-producer.bat b/bat/kafka-console-producer.bat new file mode 100644 index 0000000..b116e64 --- /dev/null +++ b/bat/kafka-console-producer.bat @@ -0,0 +1,20 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +SetLocal +set KAFKA_HEAP_OPTS=-Xmx512M +%~dp0kafka-run-class.bat kafka.producer.ConsoleProducer %* +EndLocal diff --git a/bat/kafka-run-class.bat b/bat/kafka-run-class.bat new file mode 100644 index 0000000..e7f1eb5 --- /dev/null +++ b/bat/kafka-run-class.bat @@ -0,0 +1,98 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +setlocal enabledelayedexpansion + +IF [%1] EQU [] ( + echo USAGE: %0 classname [opts] + EXIT /B 1 +) +rem Using pushd popd to set BASE_DIR to the absolute path +pushd %~dp0..\.. +set BASE_DIR=%CD% +popd +set CLASSPATH= + +IF ["%SCALA_VERSION%"] EQU [""] ( + set SCALA_VERSION=2.8.0 +) + +rem Assume all dependencies have been packaged into one jar with sbt-assembly's task "assembly-package-dependency" +for %%i in (%BASE_DIR%\core\target\scala-%SCALA_VERSION%\*.jar) do ( + call :concat %%i +) + +for %%i in (%BASE_DIR%\perf\target\scala-%SCALA_VERSION%\kafka*.jar) do ( + call :concat %%i +) + +rem Classpath addition for release +for %%i in (%BASE_DIR%\libs\*.jar) do ( + call :concat %%i +) +for %%i in (%BASE_DIR%\kafka_*.jar) do ( + call :concat %%i +) + + +rem JMX settings +IF ["%KAFKA_JMX_OPTS%"] EQU [""] ( + set KAFKA_JMX_OPTS=-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false +) + +rem JMX port to use +IF ["%JMX_PORT%"] NEQ [""] ( + set KAFKA_JMX_OPTS=%KAFKA_JMX_OPTS% -Dcom.sun.management.jmxremote.port=%JMX_PORT% +) + +rem Log4j settings +IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] ( + set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%\config\tools-log4j.properties +) + +rem Generic jvm settings you want to add +IF ["%KAFKA_OPTS%"] EQU [""] ( + set KAFKA_OPTS= +) + + +rem Which java to use +IF ["%JAVA_HOME%"] EQU [""] ( + set JAVA=java +) ELSE ( + set JAVA="%JAVA_HOME%/bin/java" +) + +rem Memory options +IF ["%KAFKA_HEAP_OPTS%"] EQU [""] ( + set KAFKA_HEAP_OPTS=-Xmx256M +) + +rem JVM performance options +IF ["%KAFKA_JVM_PERFORMANCE_OPTS%"] EQU [""] ( + set KAFKA_JVM_PERFORMANCE_OPTS=-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true +) + +set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp %CLASSPATH% %KAFKA_OPTS% %* +rem echo. +rem echo %COMMAND% +rem echo. + +%COMMAND% + +goto :eof +:concat +set CLASSPATH=%CLASSPATH%;"%1" diff --git a/bat/kafka-server-start.bat b/bat/kafka-server-start.bat new file mode 100644 index 0000000..484e136 --- /dev/null +++ b/bat/kafka-server-start.bat @@ -0,0 +1,26 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +IF [%1] EQU [] ( + echo USAGE: %0 server.properties + EXIT /B 1 +) + +SetLocal +set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:~dp0../../config/log4j.properties +set KAFKA_HEAP_OPTS=-Xmx1G -Xms1G +%~dp0kafka-run-class.bat kafka.Kafka %* +EndLocal diff --git a/bat/kafka-server-stop.bat b/bat/kafka-server-stop.bat new file mode 100644 index 0000000..676577c --- /dev/null +++ b/bat/kafka-server-stop.bat @@ -0,0 +1,18 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +wmic process where (commandline like "%%kafka.Kafka%%" and not name="wmic.exe") delete +rem ps ax | grep -i 'kafka.Kafka' | grep -v grep | awk '{print $1}' | xargs kill -SIGTERM diff --git a/bat/kafka-topics.bat b/bat/kafka-topics.bat new file mode 100644 index 0000000..f1a9e64 --- /dev/null +++ b/bat/kafka-topics.bat @@ -0,0 +1,17 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +%~dp0kafka-run-class.bat kafka.admin.TopicCommand %* diff --git a/bat/zookeeper-server-start.bat b/bat/zookeeper-server-start.bat new file mode 100644 index 0000000..9836283 --- /dev/null +++ b/bat/zookeeper-server-start.bat @@ -0,0 +1,26 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +IF [%1] EQU [] ( + echo USAGE: %0 zookeeper.properties + EXIT /B 1 +) + +SetLocal +set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%~dp0../../config/log4j.properties +set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M +%~dp0kafka-run-class.bat org.apache.zookeeper.server.quorum.QuorumPeerMain %* +EndLocal diff --git a/bat/zookeeper-server-stop.bat b/bat/zookeeper-server-stop.bat new file mode 100644 index 0000000..8b57dd8 --- /dev/null +++ b/bat/zookeeper-server-stop.bat @@ -0,0 +1,17 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +wmic process where (commandline like "%%zookeeper%%" and not name="wmic.exe") delete diff --git a/config/consumer.properties b/config/consumer.properties new file mode 100644 index 0000000..7343cbc --- /dev/null +++ b/config/consumer.properties @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.consumer.ConsumerConfig for more details + +# Zookeeper connection string +# comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" +zookeeper.connect=127.0.0.1:2181 + +# timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=1000000 + +#consumer group id +group.id=test-consumer-group + +#consumer timeout +#consumer.timeout.ms=5000 diff --git a/config/log4j.properties b/config/log4j.properties new file mode 100644 index 0000000..baa698b --- /dev/null +++ b/config/log4j.properties @@ -0,0 +1,78 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +kafka.logs.dir=logs + +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log +log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log +log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log +log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.cleanerAppender.File=log-cleaner.log +log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log +log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +# Turn on all our debugging info +#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender +#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender +#log4j.logger.kafka.perf=DEBUG, kafkaAppender +#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender +#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG +log4j.logger.kafka=INFO, kafkaAppender + +log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender +log4j.additivity.kafka.network.RequestChannel$=false + +#log4j.logger.kafka.network.Processor=TRACE, requestAppender +#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender +#log4j.additivity.kafka.server.KafkaApis=false +log4j.logger.kafka.request.logger=WARN, requestAppender +log4j.additivity.kafka.request.logger=false + +log4j.logger.kafka.controller=TRACE, controllerAppender +log4j.additivity.kafka.controller=false + +log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender +log4j.additivity.kafka.log.LogCleaner=false + +log4j.logger.state.change.logger=TRACE, stateChangeAppender +log4j.additivity.state.change.logger=false diff --git a/config/producer.properties b/config/producer.properties new file mode 100644 index 0000000..52a7611 --- /dev/null +++ b/config/producer.properties @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.producer.ProducerConfig for more details + +############################# Producer Basics ############################# + +# list of brokers used for bootstrapping knowledge about the rest of the cluster +# format: host1:port1,host2:port2 ... +metadata.broker.list=localhost:9092 + +# name of the partitioner class for partitioning events; default partition spreads data randomly +#partitioner.class= + +# specifies whether the messages are sent asynchronously (async) or synchronously (sync) +producer.type=sync + +# specify the compression codec for all data generated: none , gzip, snappy. +# the old config values work as well: 0, 1, 2 for none, gzip, snappy, respectivally +compression.codec=none + +# message encoder +serializer.class=kafka.serializer.DefaultEncoder + +# allow topic level compression +#compressed.topics= + +############################# Async Producer ############################# +# maximum time, in milliseconds, for buffering data on the producer queue +#queue.buffering.max.ms= + +# the maximum size of the blocking queue for buffering on the producer +#queue.buffering.max.messages= + +# Timeout for event enqueue: +# 0: events will be enqueued immediately or dropped if the queue is full +# -ve: enqueue will block indefinitely if the queue is full +# +ve: enqueue will block up to this many milliseconds if the queue is full +#queue.enqueue.timeout.ms= + +# the number of messages batched at the producer +#batch.num.messages= diff --git a/config/server-1.properties b/config/server-1.properties new file mode 100644 index 0000000..96b424f --- /dev/null +++ b/config/server-1.properties @@ -0,0 +1,117 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id=1 + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port=9092 + +# Hostname the broker will bind to. If not set, the server will bind to all interfaces +#host.name=localhost + +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +#advertised.host.name= + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#advertised.port= + +# The number of threads handling network requests +num.network.threads=2 + +# The number of threads doing disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=1048576 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=1048576 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma seperated list of directories under which to store log files +log.dirs=c:/tmp/kafka-logs/broker1 + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions=2 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=536870912 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=60000 + +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect=localhost:2181 + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=1000000 diff --git a/config/server-2.properties b/config/server-2.properties new file mode 100644 index 0000000..08718a8 --- /dev/null +++ b/config/server-2.properties @@ -0,0 +1,117 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id=2 + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port=9093 + +# Hostname the broker will bind to. If not set, the server will bind to all interfaces +#host.name=localhost + +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +#advertised.host.name= + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#advertised.port= + +# The number of threads handling network requests +num.network.threads=2 + +# The number of threads doing disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=1048576 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=1048576 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma seperated list of directories under which to store log files +log.dirs=c:/tmp/kafka-logs/broker2 + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions=2 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=536870912 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=60000 + +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect=localhost:2181 + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=1000000 diff --git a/config/server.properties b/config/server.properties new file mode 100644 index 0000000..b84ea15 --- /dev/null +++ b/config/server.properties @@ -0,0 +1,117 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id=0 + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port=9092 + +# Hostname the broker will bind to. If not set, the server will bind to all interfaces +#host.name=localhost + +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +#advertised.host.name= + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#advertised.port= + +# The number of threads handling network requests +num.network.threads=2 + +# The number of threads doing disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=1048576 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=1048576 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma seperated list of directories under which to store log files +log.dirs=c:/tmp/kafka-logs + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions=2 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=536870912 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=60000 + +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect=localhost:2181 + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=1000000 diff --git a/config/test-log4j.properties b/config/test-log4j.properties new file mode 100644 index 0000000..a3ae33f --- /dev/null +++ b/config/test-log4j.properties @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.kafkaAppender.File=logs/server.log +log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.stateChangeAppender.File=logs/state-change.log +log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.requestAppender.File=logs/kafka-request.log +log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.controllerAppender.File=logs/controller.log +log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +# Turn on all our debugging info +#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender +#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender +log4j.logger.kafka.perf=DEBUG, kafkaAppender +log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender +#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG +log4j.logger.kafka=INFO, kafkaAppender + +log4j.logger.kafka.network.RequestChannel$=TRACE, requestAppender +log4j.additivity.kafka.network.RequestChannel$=false + +#log4j.logger.kafka.network.Processor=TRACE, requestAppender +#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender +#log4j.additivity.kafka.server.KafkaApis=false +log4j.logger.kafka.request.logger=TRACE, requestAppender +log4j.additivity.kafka.request.logger=false + +log4j.logger.kafka.controller=TRACE, controllerAppender +log4j.additivity.kafka.controller=false + +log4j.logger.state.change.logger=TRACE, stateChangeAppender +log4j.additivity.state.change.logger=false + + diff --git a/config/tools-log4j.properties b/config/tools-log4j.properties new file mode 100644 index 0000000..7924049 --- /dev/null +++ b/config/tools-log4j.properties @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=WARN, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + + diff --git a/config/zookeeper.properties b/config/zookeeper.properties new file mode 100644 index 0000000..6a12311 --- /dev/null +++ b/config/zookeeper.properties @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# the directory where the snapshot is stored. +dataDir=c:/tmp/zookeeper +# the port at which the clients will connect +clientPort=2181 +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 diff --git a/lib/activation-1.1.jar b/lib/activation-1.1.jar new file mode 100644 index 0000000..53f82a1 Binary files /dev/null and b/lib/activation-1.1.jar differ diff --git a/lib/jline-0.9.94.jar b/lib/jline-0.9.94.jar new file mode 100644 index 0000000..dede372 Binary files /dev/null and b/lib/jline-0.9.94.jar differ diff --git a/lib/jopt-simple-3.2.jar b/lib/jopt-simple-3.2.jar new file mode 100644 index 0000000..5637362 Binary files /dev/null and b/lib/jopt-simple-3.2.jar differ diff --git a/lib/junit-4.1.jar b/lib/junit-4.1.jar new file mode 100644 index 0000000..2c46d67 Binary files /dev/null and b/lib/junit-4.1.jar differ diff --git a/lib/kafka_2.9.2-0.8.1.1.jar b/lib/kafka_2.9.2-0.8.1.1.jar new file mode 100644 index 0000000..f9caec5 Binary files /dev/null and b/lib/kafka_2.9.2-0.8.1.1.jar differ diff --git a/lib/log4j-1.2.15.jar b/lib/log4j-1.2.15.jar new file mode 100644 index 0000000..c930a6a Binary files /dev/null and b/lib/log4j-1.2.15.jar differ diff --git a/lib/mail-1.4.jar b/lib/mail-1.4.jar new file mode 100644 index 0000000..fd4555b Binary files /dev/null and b/lib/mail-1.4.jar differ diff --git a/lib/metrics-core-2.2.0.jar b/lib/metrics-core-2.2.0.jar new file mode 100644 index 0000000..0f6d1cb Binary files /dev/null and b/lib/metrics-core-2.2.0.jar differ diff --git a/lib/scala-library-2.9.2.jar b/lib/scala-library-2.9.2.jar new file mode 100644 index 0000000..a871b3e Binary files /dev/null and b/lib/scala-library-2.9.2.jar differ diff --git a/lib/slf4j-api-1.7.2.jar b/lib/slf4j-api-1.7.2.jar new file mode 100644 index 0000000..1a88708 Binary files /dev/null and b/lib/slf4j-api-1.7.2.jar differ diff --git a/lib/snappy-java-1.0.5.jar b/lib/snappy-java-1.0.5.jar new file mode 100644 index 0000000..6dc413d Binary files /dev/null and b/lib/snappy-java-1.0.5.jar differ diff --git a/lib/zkclient-0.3.jar b/lib/zkclient-0.3.jar new file mode 100644 index 0000000..6dbd23c Binary files /dev/null and b/lib/zkclient-0.3.jar differ diff --git a/lib/zookeeper-3.3.4.jar b/lib/zookeeper-3.3.4.jar new file mode 100644 index 0000000..3ca785d Binary files /dev/null and b/lib/zookeeper-3.3.4.jar differ diff --git a/src/test/kafka/MultiBrokerProducer.java b/src/test/kafka/MultiBrokerProducer.java new file mode 100644 index 0000000..3711615 --- /dev/null +++ b/src/test/kafka/MultiBrokerProducer.java @@ -0,0 +1,34 @@ +package test.kafka; + +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; + +import java.util.Properties; +import java.util.Random; + +public class MultiBrokerProducer { + private static Producer producer; + private final Properties properties = new Properties(); + + public MultiBrokerProducer() { + properties.put("metadata.broker.list", "localhost:9092, localhost:9093"); + properties.put("serializer.class", "kafka.serializer.StringEncoder"); + properties.put("partitioner.class", "test.kafka.SimplePartitioner"); + properties.put("request.required.acks", "1"); + ProducerConfig config = new ProducerConfig(properties); + producer = new Producer<>(config); + } + + public static void main(String[] args) { + MultiBrokerProducer sp = new MultiBrokerProducer(); + Random random = new Random(); + String topic = args[0]; + for (long i = 0; i < 10; i++) { + Integer key = random.nextInt(255); + String msg = "This message is for key - " + key; + producer.send(new KeyedMessage<>(topic, msg)); + } + producer.close(); + } +} \ No newline at end of file diff --git a/src/test/kafka/SimplePartitioner.java b/src/test/kafka/SimplePartitioner.java new file mode 100644 index 0000000..ecfe1b1 --- /dev/null +++ b/src/test/kafka/SimplePartitioner.java @@ -0,0 +1,19 @@ +package test.kafka; + +import kafka.producer.Partitioner; +import kafka.utils.VerifiableProperties; + +@SuppressWarnings("UnusedDeclaration") +public class SimplePartitioner implements Partitioner { + public SimplePartitioner(VerifiableProperties properties) { + } + + public int partition(Object key, int numberOfPartitions) { + int partition = 0; + int intKey = (Integer) key; + if (intKey > 0) { + partition = intKey % numberOfPartitions; + } + return partition; + } +} \ No newline at end of file diff --git a/src/test/kafka/SimpleProducer.java b/src/test/kafka/SimpleProducer.java new file mode 100644 index 0000000..39624dd --- /dev/null +++ b/src/test/kafka/SimpleProducer.java @@ -0,0 +1,26 @@ +package test.kafka; + +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; + +import java.util.Properties; + +public class SimpleProducer { + private static Producer producer; + private final Properties properties = new Properties(); + + public SimpleProducer() { + properties.put("metadata.broker.list", "localhost:9092"); + properties.put("serializer.class", "kafka.serializer.StringEncoder"); + properties.put("request.required.acks", "1"); + producer = new Producer<>(new ProducerConfig(properties)); + } + + public static void main(String[] args) { + String topic = args[0]; + String msg = args[1]; + producer.send(new KeyedMessage<>(topic, msg)); + producer.close(); + } +} \ No newline at end of file diff --git a/src/test/kafka/consumer/ConsumerThread.java b/src/test/kafka/consumer/ConsumerThread.java new file mode 100644 index 0000000..e59ffa3 --- /dev/null +++ b/src/test/kafka/consumer/ConsumerThread.java @@ -0,0 +1,24 @@ +package test.kafka.consumer; + +import kafka.consumer.ConsumerIterator; +import kafka.consumer.KafkaStream; + +final class ConsumerThread implements Runnable { + + private KafkaStream stream; + private int threadNumber; + + public ConsumerThread(KafkaStream stream, int threadNumber) { + this.threadNumber = threadNumber; + this.stream = stream; + } + + public void run() { + ConsumerIterator it = stream.iterator(); + while (it.hasNext()) { + System.out.println("Message from thread " + threadNumber + ": " + new String(it.next().message())); + } + System.out.println("Shutting down thread: " + threadNumber); + } + +} diff --git a/src/test/kafka/consumer/MultiThreadHLConsumer.java b/src/test/kafka/consumer/MultiThreadHLConsumer.java new file mode 100644 index 0000000..d1bd81d --- /dev/null +++ b/src/test/kafka/consumer/MultiThreadHLConsumer.java @@ -0,0 +1,68 @@ +package test.kafka.consumer; + +import kafka.consumer.Consumer; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class MultiThreadHLConsumer { + + private ExecutorService executor; + private final ConsumerConnector consumer; + private final String topic; + + public MultiThreadHLConsumer(String zookeeper, String groupId, String topic) { + Properties properties = new Properties(); + properties.put("zookeeper.connect", zookeeper); + properties.put("group.id", groupId); + properties.put("zookeeper.session.timeout.ms", "500"); + properties.put("zookeeper.sync.time.ms", "250"); + properties.put("auto.commit.interval.ms", "1000"); + + consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); + this.topic = topic; + } + + public void testConsumer(int threadCount) { + Map topicCount = new HashMap<>(); + topicCount.put(topic, threadCount); + + Map>> consumerStreams = consumer.createMessageStreams(topicCount); + List> streams = consumerStreams.get(topic); + + executor = Executors.newFixedThreadPool(threadCount); + + int threadNumber = 0; + for (final KafkaStream stream : streams) { + executor.submit(new ConsumerThread(stream, threadNumber)); + threadNumber++; + } + + try { // without this wait the subsequent shutdown happens immediately before any messages are delivered + Thread.sleep(10000); + } catch (InterruptedException ie) { + + } + if (consumer != null) { + consumer.shutdown(); + } + if (executor != null) { + executor.shutdown(); + } + } + + public static void main(String[] args) { + String topic = args[0]; + int threadCount = Integer.parseInt(args[1]); + MultiThreadHLConsumer multiThreadHLConsumer = new MultiThreadHLConsumer("localhost:2181", "testgroup", topic); + multiThreadHLConsumer.testConsumer(threadCount); + } + +} diff --git a/src/test/kafka/consumer/SimpleHLConsumer.java b/src/test/kafka/consumer/SimpleHLConsumer.java new file mode 100644 index 0000000..62247a4 --- /dev/null +++ b/src/test/kafka/consumer/SimpleHLConsumer.java @@ -0,0 +1,54 @@ +package test.kafka.consumer; + +import kafka.consumer.Consumer; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.ConsumerIterator; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +public class SimpleHLConsumer { + + private final ConsumerConnector consumer; + private final String topic; + + public SimpleHLConsumer(String zookeeper, String groupId, String topic) { + Properties props = new Properties(); + props.put("zookeeper.connect", zookeeper); + props.put("group.id", groupId); + props.put("zookeeper.session.timeout.ms", "500"); + props.put("zookeeper.sync.time.ms", "250"); + props.put("auto.commit.interval.ms", "1000"); + + consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); + this.topic = topic; + } + + public void testConsumer() { + Map topicCount = new HashMap<>(); + topicCount.put(topic, 1); + + Map>> consumerStreams = consumer.createMessageStreams(topicCount); + List> streams = consumerStreams.get(topic); + for (final KafkaStream stream : streams) { + ConsumerIterator it = stream.iterator(); + while (it.hasNext()) { + System.out.println("Message from Single Topic: " + new String(it.next().message())); + } + } + if (consumer != null) { + consumer.shutdown(); + } + } + + public static void main(String[] args) { + String topic = args[0]; + SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("localhost:2181", "testgroup", topic); + simpleHLConsumer.testConsumer(); + } + +}