Skip to content

Commit

Permalink
Document kafka_client_api_version
Browse files Browse the repository at this point in the history
This previously existed but wasn't documented. Additionally, this
respaces some of the other comments to the new line length.
  • Loading branch information
jeffwidman committed Aug 15, 2019
1 parent 9050d57 commit 8e91590
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ instances:
- localhost:9092
# - <KAFKA_BROKER_ENDPOINT>:9092

## @param kafka_client_api_version - string - optional
## Specify the highest client protocol version supported by all brokers in the cluster.
##
## This is a performance optimization. If this is not set, then the check automatically probes
## the cluster for broker version during the connection bootstrapping process. Explicitly setting
## this bypasses that probe, saving 3-5 network calls depending on the broker version. Note that
## probing randomly picks a broker to probe, so in a mixed-version cluster, probing returns a
## non-deterministic result.
#
# kafka_client_api_version: "2.3.0"

## @param consumer_groups - object - optional
## When using Kafka to store consumer offsets each level is mandatory.
## When using zookeeper to store consumer offsets each level is optional.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

class KafkaCheck(AgentCheck):
"""
Check the offsets and lag of Kafka consumers.
Check the offsets and lag of Kafka consumers. This check also returns broker highwater offsets.
This check also returns broker highwater offsets.
For details about the supported options, see the associated `conf.yaml.example`.
"""

__NAMESPACE__ = 'kafka'
Expand Down
63 changes: 26 additions & 37 deletions kafka_consumer/datadog_checks/kafka_consumer/legacy_0_10_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@

class LegacyKafkaCheck_0_10_2(AgentCheck):
"""
Check the offsets and lag of Kafka consumers.
Check the offsets and lag of Kafka consumers. This check also returns broker highwater offsets.
This check also returns broker highwater offsets.
This is used if the `post_0_10_2` config is set to false
This is the legacy codepath which is used when either broker version < 0.10.2 or zk_connect_str has a value.
"""

SOURCE_TYPE_NAME = 'kafka'
Expand Down Expand Up @@ -55,13 +53,11 @@ def __init__(self, name, init_config, instances):
self._zk_client.start()

def check(self, instance):
# For calculating lag, we have to fetch offsets from both kafka and
# zookeeper. There's a potential race condition because whichever one we
# check first may be outdated by the time we check the other. Better to
# check consumer offset before checking broker offset because worst case
# is that overstates consumer lag a little. Doing it the other way can
# understate consumer lag to the point of having negative consumer lag,
# which just creates confusion because it's theoretically impossible.
# For calculating consumer lag, we have to fetch both the consumer offset and the broker highwater offset.
# There's a potential race condition because whichever one we check first may be outdated by the time we check
# the other. Better to check consumer offsets before checking broker offsets because worst case is that
# overstates consumer lag a little. Doing it the other way can understate consumer lag to the point of having
# negative consumer lag, which just creates confusion because it's theoretically impossible.

# If monitor_unlisted_consumer_groups is True, fetch all groups stored in ZK
consumer_groups = None
Expand All @@ -82,18 +78,18 @@ def check(self, instance):
self._kafka_client._maybe_refresh_metadata()

# Fetch consumer group offsets from Kafka

# For legacy reasons, this only fetches consumer offsets from kafka if zookeeper is omitted or
# kafka_consumer_offsets is True.
if is_affirmative(instance.get('kafka_consumer_offsets', self._zk_hosts_ports is None)):
# For now, consumer groups are mandatory if not using ZK
if self._zk_hosts_ports is None and not consumer_groups:
raise ConfigurationError(
'Invalid configuration - if you are not collecting '
'offsets from ZK you _must_ specify consumer groups'
'Invalid configuration - if you are collecting consumer offsets from Kafka, and your brokers are '
'older than 0.10.2, then you _must_ specify consumer groups and their topics. Older brokers lack '
'the necessary protocol support to determine which topics a consumer is consuming. See KIP-88 for '
'details.'
)
# kafka-python automatically probes the cluster for broker version
# and then stores it. Note that this returns the first version
# found, so in a mixed-version cluster this will be a
# non-deterministic result.
#
# Kafka 0.8.2 added support for storing consumer offsets in Kafka.
if self._kafka_client.config.get('api_version') >= (0, 8, 2):
kafka_consumer_offsets, topics = self._get_kafka_consumer_offsets(instance, consumer_groups)
Expand Down Expand Up @@ -146,9 +142,12 @@ def _create_kafka_client(self):
bootstrap_servers=kafka_conn_str,
client_id='dd-agent',
request_timeout_ms=self.init_config.get('kafka_timeout', DEFAULT_KAFKA_TIMEOUT) * 1000,
# if `kafka_client_api_version` is not set, then kafka-python automatically probes the cluster for broker
# version during the bootstrapping process. Note that probing randomly picks a broker to probe, so in a
# mixed-version cluster probing returns a non-deterministic result.
api_version=self.instance.get('kafka_client_api_version'),
# While we check for SSL params, if not present they will default
# to the kafka-python values for plaintext connections
# While we check for SSL params, if not present they will default to the kafka-python values for plaintext
# connections
security_protocol=self.instance.get('security_protocol', 'PLAINTEXT'),
sasl_mechanism=self.instance.get('sasl_mechanism'),
sasl_plain_username=self.instance.get('sasl_plain_username'),
Expand All @@ -168,8 +167,7 @@ def _make_blocking_req(self, request, node_id=None):
node_id = self._kafka_client.least_loaded_node()

while not self._kafka_client.ready(node_id):
# poll until the connection to broker is ready, otherwise send()
# will fail with NodeNotReadyError
# poll until the connection to broker is ready, otherwise send() will fail with NodeNotReadyError
self._kafka_client.poll()

future = self._kafka_client.send(node_id, request)
Expand Down Expand Up @@ -222,21 +220,13 @@ def _process_highwater_offsets(self, response):

def _get_broker_offsets(self, topics):
"""
Fetch highwater offsets for each topic/partition from Kafka cluster.
Fetch highwater offsets for topic_partitions in the Kafka cluster.
Do this for all partitions in the cluster because even if it has no
consumers, we may want to measure whether producers are successfully
producing. No need to limit this for performance because fetching broker
offsets from Kafka is a relatively inexpensive operation.
Do this for all partitions in the cluster because even if it has no consumers, we may want to measure whether
producers are successfully producing.
Sends one OffsetRequest per broker to get offsets for all partitions
where that broker is the leader:
Sends one OffsetRequest per broker to get offsets for all partitions where that broker is the leader:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI(AKAListOffset)
Can we cleanup connections on agent restart?
Brokers before 0.9 - accumulate stale connections on restarts.
In 0.9 Kafka added connections.max.idle.ms
https://issues.apache.org/jira/browse/KAFKA-1282
"""

# Connect to Kafka
Expand Down Expand Up @@ -292,9 +282,8 @@ def _report_consumer_metrics(
)
if (topic, partition) not in unled_topic_partitions:
self.log.warn(
"Consumer group: %s has offsets for topic: %s "
"partition: %s, but that topic partition doesn't actually "
"exist in the cluster.",
"Consumer group: %s has offsets for topic: %s partition: %s, but that topic partition doesn't "
"actually exist in the cluster.",
consumer_group,
topic,
partition,
Expand Down

0 comments on commit 8e91590

Please sign in to comment.