Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
bkimminich committed Jul 8, 2014
0 parents commit a45dfbd
Show file tree
Hide file tree
Showing 39 changed files with 1,175 additions and 0 deletions.
32 changes: 32 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
*.class

# Intellij
.idea/
*.iml
*.iws
*.eml
out/

# Eclipse
*.pydevproject
.metadata
bin/
tmp/
*.tmp
*.bak
*.swp
*~.nib
local.properties
.settings/
.loadpath
.classpath
.project
RemoteSystemsTempFiles/
.externalToolBuilders/
*.launch
.cproject
.buildpath
*.userlibraries

# Grunt
node_modules
54 changes: 54 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
Fixed and updated sample implementations from the book "Apache Kafka"
=====================================================================
* Updated to Apache Kafka 0.8.1.1
* Configuration optimized for usage on Windows machines
* Windows batch scripts fixed (taken from https://github.com/HCanber/kafka by @HCanber)
* Code examples repaired and refactored

Initial Setup
-------------
1. [Download and install Apache Kafka](http://kafka.apache.org/downloads.html) 0.8.1.1 (I used the recommended [Scala 2.9.2 binary](https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz))
2. Copy the scripts from [/bat](/bat) into `/bin/windows` of your Kafka installation folder (overwrite existing scripts)
3. Copy the property files from [/config](/config) into `/config` of your Kafka installation folder (overwrite existing files)

Simple Java Producer (Chapter 5, Page 35ff.)
--------------------------------------------
1. Open command line in your Kafka installation folder
2. Launch Zookeeper with `.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties`
3. Open a second command line in your Kafka installation folder
4. Launch single Kafka broker: `.\bin\windows\kafka-server-start.bat .\config\server.properties`
5. Open a third command line in your Kafka installation folder
6. Create a topic: `.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test`
7. Start a console consumer for that topic: `.\bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning`
8. From a fourth command line or your IDE run [SimpleProducer](/src/test/kafka/SimpleProducer.java) with topic and message as arguments: `java SimpleProducer test HelloKafka`
9. The message _HelloKafka_ should appear in the console consumer's log

Java Producer with Message Partitioning (Chapter 5, Page 37ff.)
---------------------------------------------------------------
1. Open command line in your Kafka installation folder
2. Launch Zookeeper with `.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties`
3. Open a second command line in your Kafka installation folder
4. Launch first Kafka broker: `.\bin\windows\kafka-server-start.bat .\config\server-1.properties`
5. Open a third command line in your Kafka installation folder
4. Launch second Kafka broker: `.\bin\windows\kafka-server-start.bat .\config\server-2.properties`
5. Open a fourth command line in your Kafka installation folder
6. Create a topic: `.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test`
7. Start a console consumer for that topic: `.\bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning`
8. From a fifth command line or your IDE run [MultiBrokerProducer](/src/test/kafka/MultiBrokerProducer.java) with topic as argument: `java MultiBrokerProducer test`
9. Ten messages starting with _This message is for key (...)_ should appear in the console consumer's log

Simple High Level Java Consumer (Chapter 6, Page 47ff.)
-------------------------------------------------------
1. Launch multi-broker Kafka cluster and create topic `test` as described in step 1-6 of __Java Producer with Message Partitioning__
2. From another command line or your IDE run [SimpleHLConsumer](/src/test/kafka/consumer/SimpleHLConsumer.java) with topic as argument: `java SimpleHLConsumer test`
3. From another command line or your IDE run [MultiBrokerProducer](/src/test/kafka/MultiBrokerProducer.java) with same topic as argument: `java MultiBrokerProducer test`
4. Ten messages starting with _This message is for key (...)_ should appear in the log of the __SimpleHLConsumer__

Multithreaded Consumer for Multipartition Topics (Chapter 6, Page 50ff.)
------------------------------------------------------------------------
1. Launch multi-broker Kafka cluster and create topic `test` as described in step 1-6 of __Java Producer with Message Partitioning__
2. From another command line or your IDE run [MultiThreadHLConsumer](/src/test/kafka/consumer/MultiThreadHLConsumer.java) with topic and number of threads as argument: `java MultiThreadHLConsumer test 4`
3. From another command line or your IDE run [MultiBrokerProducer](/src/test/kafka/MultiBrokerProducer.java) with same topic as argument: `java MultiBrokerProducer test`
4. Ten messages starting with _Message from thread (...)_ should appear in the log of the __MultiThreadHLConsumer__

[![endorse](https://api.coderwall.com/bkimminich/endorsecount.png)](https://coderwall.com/bkimminich)
1 change: 1 addition & 0 deletions bat/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed Windows batch scripts for Kafka 0.8.1 from https://github.com/HCanber/kafka
20 changes: 20 additions & 0 deletions bat/kafka-console-consumer.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

SetLocal
set KAFKA_HEAP_OPTS=-Xmx512M
%~dp0kafka-run-class.bat kafka.consumer.ConsoleConsumer %*
EndLocal
20 changes: 20 additions & 0 deletions bat/kafka-console-producer.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

SetLocal
set KAFKA_HEAP_OPTS=-Xmx512M
%~dp0kafka-run-class.bat kafka.producer.ConsoleProducer %*
EndLocal
98 changes: 98 additions & 0 deletions bat/kafka-run-class.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

setlocal enabledelayedexpansion

IF [%1] EQU [] (
echo USAGE: %0 classname [opts]
EXIT /B 1
)
rem Using pushd popd to set BASE_DIR to the absolute path
pushd %~dp0..\..
set BASE_DIR=%CD%
popd
set CLASSPATH=

IF ["%SCALA_VERSION%"] EQU [""] (
set SCALA_VERSION=2.8.0
)

rem Assume all dependencies have been packaged into one jar with sbt-assembly's task "assembly-package-dependency"
for %%i in (%BASE_DIR%\core\target\scala-%SCALA_VERSION%\*.jar) do (
call :concat %%i
)

for %%i in (%BASE_DIR%\perf\target\scala-%SCALA_VERSION%\kafka*.jar) do (
call :concat %%i
)

rem Classpath addition for release
for %%i in (%BASE_DIR%\libs\*.jar) do (
call :concat %%i
)
for %%i in (%BASE_DIR%\kafka_*.jar) do (
call :concat %%i
)


rem JMX settings
IF ["%KAFKA_JMX_OPTS%"] EQU [""] (
set KAFKA_JMX_OPTS=-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
)

rem JMX port to use
IF ["%JMX_PORT%"] NEQ [""] (
set KAFKA_JMX_OPTS=%KAFKA_JMX_OPTS% -Dcom.sun.management.jmxremote.port=%JMX_PORT%
)

rem Log4j settings
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%\config\tools-log4j.properties
)

rem Generic jvm settings you want to add
IF ["%KAFKA_OPTS%"] EQU [""] (
set KAFKA_OPTS=
)


rem Which java to use
IF ["%JAVA_HOME%"] EQU [""] (
set JAVA=java
) ELSE (
set JAVA="%JAVA_HOME%/bin/java"
)

rem Memory options
IF ["%KAFKA_HEAP_OPTS%"] EQU [""] (
set KAFKA_HEAP_OPTS=-Xmx256M
)

rem JVM performance options
IF ["%KAFKA_JVM_PERFORMANCE_OPTS%"] EQU [""] (
set KAFKA_JVM_PERFORMANCE_OPTS=-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true
)

set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp %CLASSPATH% %KAFKA_OPTS% %*
rem echo.
rem echo %COMMAND%
rem echo.

%COMMAND%

goto :eof
:concat
set CLASSPATH=%CLASSPATH%;"%1"
26 changes: 26 additions & 0 deletions bat/kafka-server-start.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

IF [%1] EQU [] (
echo USAGE: %0 server.properties
EXIT /B 1
)

SetLocal
set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:~dp0../../config/log4j.properties
set KAFKA_HEAP_OPTS=-Xmx1G -Xms1G
%~dp0kafka-run-class.bat kafka.Kafka %*
EndLocal
18 changes: 18 additions & 0 deletions bat/kafka-server-stop.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

wmic process where (commandline like "%%kafka.Kafka%%" and not name="wmic.exe") delete
rem ps ax | grep -i 'kafka.Kafka' | grep -v grep | awk '{print $1}' | xargs kill -SIGTERM
17 changes: 17 additions & 0 deletions bat/kafka-topics.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

%~dp0kafka-run-class.bat kafka.admin.TopicCommand %*
26 changes: 26 additions & 0 deletions bat/zookeeper-server-start.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

IF [%1] EQU [] (
echo USAGE: %0 zookeeper.properties
EXIT /B 1
)

SetLocal
set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%~dp0../../config/log4j.properties
set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
%~dp0kafka-run-class.bat org.apache.zookeeper.server.quorum.QuorumPeerMain %*
EndLocal
17 changes: 17 additions & 0 deletions bat/zookeeper-server-stop.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

wmic process where (commandline like "%%zookeeper%%" and not name="wmic.exe") delete
29 changes: 29 additions & 0 deletions config/consumer.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.consumer.ConsumerConfig for more details

# Zookeeper connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zookeeper.connect=127.0.0.1:2181

# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000

#consumer group id
group.id=test-consumer-group

#consumer timeout
#consumer.timeout.ms=5000
Loading

0 comments on commit a45dfbd

Please sign in to comment.