Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kafka_consumer] enable fetching consumer offsets from kafka #654

Merged
merged 20 commits into from
Nov 7, 2017
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
767ee0d
[kafka_consumer] adding support for kafka_consumer offsets.
truthbk Aug 2, 2017
a3c61df
[kafka_consumer][test] enabling kafka consumer offset tests - attempt…
truthbk Aug 2, 2017
d2ecd93
[kafka_consumer] ignore topics for now, process kafka offsets.
truthbk Aug 3, 2017
2669165
[kafka_consumer] improve coordinator_id collection + cleanup.
truthbk Aug 3, 2017
58ff0c7
[kafka_consumer] kafka consumer offsets: handle error code.
truthbk Aug 3, 2017
91fed9d
[kafka_consumer] optional round robin to collect offsets if no coord_id
truthbk Aug 3, 2017
a47978f
[kafka_consumer][test] wait for cluster stabilization up to MAX_SETUP…
truthbk Aug 3, 2017
261edd0
[kafka_consumer][test] adding more flavors to Travis.
truthbk Aug 4, 2017
06987ea
[kafka_consumer] adding new configuration elements.
truthbk Aug 4, 2017
aa9f015
[kafka_consumer] be less smart about what we collect.
truthbk Oct 13, 2017
f68058a
[kafka_consumer] populate topics dict correctly.
truthbk Oct 13, 2017
afa0e4e
[kafka_consumer] always refresh metadata + flakes
truthbk Oct 16, 2017
7aca935
[kafka_consumer] adding ZK min collection interval + fixes.
truthbk Oct 24, 2017
19e49cf
[kafka_consumer] addressing flake8s.
truthbk Oct 24, 2017
b0969d6
[kafka_consumer] fix consumer offsets accidental removal. Cleanup.
truthbk Oct 25, 2017
8718cf1
[kafka_consumer] bumping up context limit upper bound.
truthbk Oct 26, 2017
107e956
[kafka_consumer] bump CI to 0.10.2.1-1
truthbk Nov 2, 2017
67f17df
[oracle][travis] fixing typo with docker tags
truthbk Nov 2, 2017
4441a00
[kafka_consumer] removing stale comment.
truthbk Nov 2, 2017
412fca8
Merge branch 'master' into jaime/kafkaoffsets2
masci Nov 3, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,13 @@ env:
- TRAVIS_FLAVOR=haproxy FLAVOR_VERSION=1.4.26
- TRAVIS_FLAVOR=haproxy FLAVOR_VERSION=1.5.11
- TRAVIS_FLAVOR=haproxy FLAVOR_VERSION=1.6.9
- TRAVIS_FLAVOR=kafka_consumer FLAVOR_VERSION=0.10.1.0-1 FLAVOR_OPTIONS='zookeeper'
- TRAVIS_FLAVOR=kafka_consumer FLAVOR_VERSION=0.10.1.0-1 FLAVOR_OPTIONS='kafka'
- TRAVIS_FLAVOR=kafka_consumer FLAVOR_VERSION=0.8.1.1-1 FLAVOR_OPTIONS='zookeeper'
- TRAVIS_FLAVOR=kafka_consumer FLAVOR_VERSION=0.9.0.1-1 FLAVOR_OPTIONS='zookeeper'
- TRAVIS_FLAVOR=kafka_consumer FLAVOR_VERSION=0.9.0.1-1 FLAVOR_OPTIONS='kafka'
- TRAVIS_FLAVOR=kafka_consumer FLAVOR_VERSION=0.10.2.1 FLAVOR_OPTIONS='zookeeper'
- TRAVIS_FLAVOR=kafka_consumer FLAVOR_VERSION=0.10.2.1 FLAVOR_OPTIONS='kafka'
- TRAVIS_FLAVOR=kafka_consumer FLAVOR_VERSION=0.11.0.1 FLAVOR_OPTIONS='zookeeper'
- TRAVIS_FLAVOR=kafka_consumer FLAVOR_VERSION=0.11.0.1 FLAVOR_OPTIONS='kafka'
- TRAVIS_FLAVOR=kafka FLAVOR_VERSION=0.10.1.0-1
- TRAVIS_FLAVOR=kong FLAVOR_VERSION=0.9.0
- TRAVIS_FLAVOR=kube_dns FLAVOR_VERSION=0.1.0
Expand Down
478 changes: 327 additions & 151 deletions kafka_consumer/check.py

Large diffs are not rendered by default.

20 changes: 13 additions & 7 deletions kafka_consumer/ci/kafka_consumer.rake
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,31 @@ def zookeeper_version
ENV['ZOOKEEPER_VERSION'] || '3.4.9'
end

kafka_legacy = '0.8.2.0'

namespace :ci do
namespace :kafka_consumer do |flavor|
task before_install: ['ci:common:before_install']

task :install do
Rake::Task['ci:common:install'].invoke('kafka_consumer')
sh %(EXTERNAL_PORT=9092 EXTERNAL_JMX_PORT=9999 CONSUMER_OFFSET_STORAGE=#{kafka_consumer_options} KAFKA_TOPICS=#{kafka_topics} \
ZOOKEEPER_VERSION=#{zookeeper_version} \
sh %(EXTERNAL_PORT=9092 EXTERNAL_JMX_PORT=9999 KAFKA_OFFSETS_STORAGE=#{kafka_consumer_options} KAFKA_CREATE_TOPICS=#{kafka_topics} \
ZOOKEEPER_VERSION=#{zookeeper_version} JMX_PORT=9999 KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" \
KAFKA_ADVERTISED_HOST_NAME="172.17.0.1" KAFKA_ZOOKEEPER_CONNECT="zookeeper:2181" \
docker-compose -f #{ENV['TRAVIS_BUILD_DIR']}/kafka_consumer/ci/resources/docker-compose-single-broker.yml up -d)
Wait.for 2181
Wait.for 9092
wait_on_docker_logs('resources_kafka_1', 20, '[Kafka Server 1001], started')
wait_on_docker_logs('resources_zookeeper_1', 20, 'NodeExists for /brokers/ids')
wait_on_docker_logs('resources_kafka_1', 20, 'Created topic "marvel"')
wait_on_docker_logs('resources_kafka_1', 20, 'Created topic "dc"')
wait_on_docker_logs('resources_kafka_1', 20, ' started (kafka.server.KafkaServer)')
wait_on_docker_logs('resources_zookeeper_1', 20, 'NoNode for /brokers')
if Gem::Version.new(kafka_consumer_version) > Gem::Version.new(kafka_legacy)
wait_on_docker_logs('resources_kafka_1', 20, 'Created topic "marvel"')
wait_on_docker_logs('resources_kafka_1', 20, 'Created topic "dc"')
end

sh %(EXTERNAL_PORT=9091 EXTERNAL_JMX_PORT=9998 CONSUMER_OFFSET_STORAGE=#{kafka_consumer_options} KAFKA_TOPICS=#{kafka_topics} \
ZOOKEEPER_VERSION=#{zookeeper_version} \
docker-compose -f #{ENV['TRAVIS_BUILD_DIR']}/kafka/ci/resources/docker-compose-single-broker.yml scale kafka=2)
wait_on_docker_logs('resources_kafka_2', 20, '[Kafka Server 1002], started')
wait_on_docker_logs('resources_kafka_2', 20, ' started (kafka.server.KafkaServer)')
end

task before_script: ['ci:common:before_script'] do
Expand Down
13 changes: 6 additions & 7 deletions kafka_consumer/ci/resources/docker-compose-single-broker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@ services:
- "${EXTERNAL_PORT}:9092"
- "${EXTERNAL_JMX_PORT}:9999"
environment:
# KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
KAFKA_ADVERTISED_HOST_NAME: 172.17.0.1
KAFKA_CREATE_TOPICS: "${KAFKA_TOPICS}"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_HEAP_OPTS: "-Xmx256M -Xms128M"
KAFKA_OFFSETS_STORAGE: "${CONSUMER_OFFSET_STORAGE}"
JMX_PORT: "9999"
- KAFKA_OFFSETS_STORAGE
- KAFKA_CREATE_TOPICS
- KAFKA_ADVERTISED_HOST_NAME
- KAFKA_HEAP_OPTS
- JMX_PORT
- KAFKA_ZOOKEEPER_CONNECT
volumes:
- /var/run/docker.sock:/var/run/docker.sock
13 changes: 13 additions & 0 deletions kafka_consumer/conf.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ init_config:
# zk_timeout: 5
# Customize the Kafka connection timeout here
# kafka_timeout: 5
# Customize max number of retries per failed query to Kafka
# kafka_retries: 3
# Customize the number of seconds that must elapse between running this check.
# When checking Kafka offsets stored in Zookeeper, a single run of this check
# must stat zookeeper more than the number of consumers * topic_partitions
Expand All @@ -16,19 +18,30 @@ init_config:
# the self discovery of consumer groups, topics and partitions) the check
# will collect at metrics for at most 100 partitions.


instances:
# In a production environment, it's often useful to specify multiple
# Kafka / Zookeper nodes for a single check instance. This way you
# only generate a single check process, but if one host goes down,
# KafkaClient / KazooClient will try contacting the next host.
# Details: https://github.com/DataDog/dd-agent/issues/2943
#
# If you wish to only collect consumer offsets from kafka, because
# you're using the new style consumers, you can comment out all
# zk_* configuration elements below.
# Please note that unlisted consumer groups are not supported at
# the moment when zookeeper consumer offset collection is disabled.
- kafka_connect_str:
- localhost:9092
- another_kafka_broker:9092
zk_connect_str:
- localhost:2181
- another_zookeeper:2181
# zk_iteration_ival: 1 # how many seconds between ZK consumer offset
# collections. If kafka consumer offsets disabled
# this has no effect.
# zk_prefix: /0.8
# kafka_consumer_offsets: false
consumer_groups:
my_consumer: # consumer group name
my_topic: [0, 1, 4, 12] # topic_name: list of partitions
Expand Down
158 changes: 129 additions & 29 deletions kafka_consumer/test_kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
from kafka import KafkaConsumer, KafkaProducer

from kazoo.client import KazooClient
from docker import Client # required by test setup

# project
from tests.checks.common import AgentCheckTest
from tests.checks.common import AgentCheckTest, log


instances = [{
zk_instance = {
'kafka_connect_str': '172.17.0.1:9092',
'zk_connect_str': 'localhost:2181',
# 'zk_prefix': '/0.8',
Expand All @@ -28,7 +29,18 @@
'marvel': [0]
}
}
}]
}

kafka_instance = {
'kafka_connect_str': '172.17.0.1:9092',
'kafka_consumer_offsets': True,
'consumer_groups': {
'my_consumer': {
'marvel': [0]
}
}
}


BROKER_METRICS = [
'kafka.broker_offset',
Expand All @@ -43,25 +55,31 @@
PARTITIONS = [0, 1]

SHUTDOWN = threading.Event()
CLUSTER_READY = 'Stabilized group my_consumer'
KAFKA_IMAGE_NAME = 'wurstmeister/kafka'
DOCKER_TO = 10

class Producer(threading.Thread):

def run(self):
producer = KafkaProducer(bootstrap_servers=instances[0]['kafka_connect_str'])
producer = KafkaProducer(bootstrap_servers=zk_instance['kafka_connect_str'])

while not SHUTDOWN.is_set():
for partition in PARTITIONS:
producer.send('marvel', b"Peter Parker", partition=partition)
producer.send('marvel', b"Bruce Banner", partition=partition)
producer.send('marvel', b"Tony Stark", partition=partition)
producer.send('marvel', b"Johhny Blaze", partition=partition)
producer.send('marvel', b"\xc2BoomShakalaka", partition=partition)
producer.send('dc', b"Diana Prince", partition=partition)
producer.send('dc', b"Bruce Wayne", partition=partition)
producer.send('dc', b"Clark Kent", partition=partition)
producer.send('dc', b"Arthur Curry", partition=partition)
producer.send('dc', b"\xc2ShakalakaBoom", partition=partition)
time.sleep(1)
try:
producer.send('marvel', b"Peter Parker", partition=partition)
producer.send('marvel', b"Bruce Banner", partition=partition)
producer.send('marvel', b"Tony Stark", partition=partition)
producer.send('marvel', b"Johhny Blaze", partition=partition)
producer.send('marvel', b"\xc2BoomShakalaka", partition=partition)
producer.send('dc', b"Diana Prince", partition=partition)
producer.send('dc', b"Bruce Wayne", partition=partition)
producer.send('dc', b"Clark Kent", partition=partition)
producer.send('dc', b"Arthur Curry", partition=partition)
producer.send('dc', b"\xc2ShakalakaBoom", partition=partition)
time.sleep(1)
except Exception:
pass


class ZKConsumer(threading.Thread):
Expand All @@ -70,7 +88,7 @@ def run(self):
zk_path_topic_tmpl = '/consumers/my_consumer/offsets/'
zk_path_partition_tmpl = zk_path_topic_tmpl + '{topic}/{partition}'

zk_conn = KazooClient(instances[0]['zk_connect_str'], timeout=10)
zk_conn = KazooClient(zk_instance['zk_connect_str'], timeout=10)
zk_conn.start()

for topic in TOPICS:
Expand All @@ -81,7 +99,7 @@ def run(self):
zk_conn.ensure_path(node_path)
zk_conn.set(node_path, str(0))

consumer = KafkaConsumer(bootstrap_servers=instances[0]['kafka_connect_str'],
consumer = KafkaConsumer(bootstrap_servers=zk_instance['kafka_connect_str'],
group_id="my_consumer",
auto_offset_reset='earliest',
enable_auto_commit=False)
Expand Down Expand Up @@ -112,7 +130,7 @@ def run(self):
class KConsumer(threading.Thread):

def run(self):
consumer = KafkaConsumer(bootstrap_servers=instances[0]['kafka_connect_str'],
consumer = KafkaConsumer(bootstrap_servers=kafka_instance['kafka_connect_str'],
group_id="my_consumer",
auto_offset_reset='earliest')
consumer.subscribe(TOPICS)
Expand All @@ -125,6 +143,7 @@ def run(self):
class TestKafka(AgentCheckTest):
"""Basic Test for kafka_consumer integration."""
CHECK_NAME = 'kafka_consumer'
MAX_SETUP_WAIT = 60
THREADS = [Producer()]

def __init__(self, *args, **kwargs):
Expand All @@ -137,26 +156,80 @@ def __init__(self, *args, **kwargs):

@classmethod
def setUpClass(cls):
"""
Setup the consumer + producer, and wait for cluster
"""
start = time.time()

cls.THREADS[0].start()
time.sleep(5)
cls.THREADS[1].start()
time.sleep(5)

try:
cli = Client(base_url='unix://var/run/docker.sock',
timeout=DOCKER_TO)
containers = cli.containers()

nodes = []
for c in containers:
if KAFKA_IMAGE_NAME in c.get('Image'):
nodes.append(c)

elapsed = time.time() - start
while elapsed < cls.MAX_SETUP_WAIT:
for node in nodes:
_log = cli.logs(node.get('Id'))
if CLUSTER_READY in _log:
return

time.sleep(1)
elapsed = time.time() - start
except Exception:
pass

log.info('Unable to verify kafka cluster status - tests may fail')

@classmethod
def tearDownClass(cls):
SHUTDOWN.set()
for t in cls.THREADS:
if t.is_alive():
t.join(5)

def is_supported(self, flavors):
supported = False
version = os.environ.get('FLAVOR_VERSION')
flavor = os.environ.get('FLAVOR_OPTIONS','').lower()

if not version:
return False

for f in flavors:
if f == flavor:
supported = True

if not supported:
return False

if version is not 'latest':
version = version.split('-')[0]
version = tuple(s for s in version.split('.') if s.strip())
if flavor is 'kafka' and version <= self.check.LAST_ZKONLY_VERSION:
supported = False

return supported


def test_check_zk(self):
"""
Testing Kafka_consumer check.
"""

if os.environ.get('FLAVOR_OPTIONS','').lower() == "kafka":
raise SkipTest("Skipping test - environment not configured for ZK consumer offsets")
if not self.is_supported(['zookeeper']):
raise SkipTest("Skipping test - not supported in current environment")

instances = [zk_instance]
self.run_check({'instances': instances})

for instance in instances:
Expand All @@ -168,7 +241,7 @@ def test_check_zk(self):
for mname in BROKER_METRICS:
self.assertMetric(mname, tags=tags, at_least=1)
for mname in CONSUMER_METRICS:
self.assertMetric(mname, tags=tags + ["consumer_group:{}".format(name)], at_least=1)
self.assertMetric(mname, tags=tags + ["source:zk", "consumer_group:{}".format(name)], at_least=1)

# let's reassert for the __consumer_offsets - multiple partitions
self.assertMetric('kafka.broker_offset', at_least=1)
Expand All @@ -180,16 +253,17 @@ def test_check_nogroups_zk(self):
Testing Kafka_consumer check grabbing groups from ZK
"""

if os.environ.get('FLAVOR_OPTIONS','').lower() == "kafka":
raise SkipTest("Skipping test - environment not configured for ZK consumer offsets")
if not self.is_supported(['zookeeper']):
raise SkipTest("Skipping test - not supported in current environment")

nogroup_instances = copy.deepcopy(instances)
nogroup_instances[0].pop('consumer_groups')
nogroup_instances[0]['monitor_unlisted_consumer_groups'] = True
nogroup_instance = copy.deepcopy(zk_instance)
nogroup_instance.pop('consumer_groups')
nogroup_instance['monitor_unlisted_consumer_groups'] = True

self.run_check({'instances': nogroup_instances})
instances = [nogroup_instance]
self.run_check({'instances': instances})

for instance in nogroup_instances:
for instance in instances:
for topic in TOPICS:
if topic is not '__consumer_offsets':
for partition in PARTITIONS:
Expand All @@ -198,9 +272,35 @@ def test_check_nogroups_zk(self):
for mname in BROKER_METRICS:
self.assertMetric(mname, tags=tags, at_least=1)
for mname in CONSUMER_METRICS:
self.assertMetric(mname, tags=tags + ["consumer_group:my_consumer"], at_least=1)
self.assertMetric(mname, tags=tags + ["source:zk", "consumer_group:my_consumer"], at_least=1)
else:
for mname in BROKER_METRICS + CONSUMER_METRICS:
self.assertMetric(mname, at_least=1)

self.coverage_report()

def test_check_kafka(self):
"""
Testing Kafka_consumer check.
"""

if not self.is_supported(['kafka']):
raise SkipTest("Skipping test - not supported in current environment")

instances = [kafka_instance]
self.run_check({'instances': instances})

for instance in instances:
for name, consumer_group in instance['consumer_groups'].iteritems():
for topic, partitions in consumer_group.iteritems():
for partition in partitions:
tags = ["topic:{}".format(topic),
"partition:{}".format(partition)]
for mname in BROKER_METRICS:
self.assertMetric(mname, tags=tags, at_least=1)
for mname in CONSUMER_METRICS:
self.assertMetric(mname, tags=tags + ["source:kafka", "consumer_group:{}".format(name)], at_least=1)

# let's reassert for the __consumer_offsets - multiple partitions
self.assertMetric('kafka.broker_offset', at_least=1)
self.coverage_report()
1 change: 1 addition & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
docker-py==1.10.6
docker-compose==1.9.0
flake8==2.5.1
mock==2.0.0
Expand Down