From 8e915909c15d4019f0a6c2ecdf190a7ba8a8a22a Mon Sep 17 00:00:00 2001 From: Jeff Widman Date: Thu, 15 Aug 2019 11:30:53 -0700 Subject: [PATCH] Document kafka_client_api_version This previously existed but wasn't documented. Additionally, this respaces some of the other comments to the new line length. --- .../kafka_consumer/data/conf.yaml.example | 11 ++++ .../kafka_consumer/kafka_consumer.py | 4 +- .../kafka_consumer/legacy_0_10_2.py | 63 ++++++++----------- 3 files changed, 39 insertions(+), 39 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example b/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example index 9f6285f82ae06..04e4a1e3b9ebd 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example +++ b/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example @@ -36,6 +36,17 @@ instances: - localhost:9092 # - :9092 + ## @param kafka_client_api_version - string - optional + ## Specify the highest client protocol version supported by all brokers in the cluster. + ## + ## This is a performance optimization. If this is not set, then the check automatically probes + ## the cluster for broker version during the connection bootstrapping process. Explicitly setting + ## this bypasses that probe, saving 3-5 network calls depending on the broker version. Note that + ## probing randomly picks a broker to probe, so in a mixed-version cluster, probing returns a + ## non-deterministic result. + # + # kafka_client_api_version: "2.3.0" + ## @param consumer_groups - object - optional ## When using Kafka to store consumer offsets each level is mandatory. ## When using zookeeper to store consumer offsets each level is optional. diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 72c8782b29619..4c14bb9bc22fc 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -8,9 +8,9 @@ class KafkaCheck(AgentCheck): """ - Check the offsets and lag of Kafka consumers. + Check the offsets and lag of Kafka consumers. This check also returns broker highwater offsets. - This check also returns broker highwater offsets. + For details about the supported options, see the associated `conf.yaml.example`. """ __NAMESPACE__ = 'kafka' diff --git a/kafka_consumer/datadog_checks/kafka_consumer/legacy_0_10_2.py b/kafka_consumer/datadog_checks/kafka_consumer/legacy_0_10_2.py index 7b95dacc872b3..a4027cc7c9d62 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/legacy_0_10_2.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/legacy_0_10_2.py @@ -22,11 +22,9 @@ class LegacyKafkaCheck_0_10_2(AgentCheck): """ - Check the offsets and lag of Kafka consumers. + Check the offsets and lag of Kafka consumers. This check also returns broker highwater offsets. - This check also returns broker highwater offsets. - - This is used if the `post_0_10_2` config is set to false + This is the legacy codepath which is used when either broker version < 0.10.2 or zk_connect_str has a value. """ SOURCE_TYPE_NAME = 'kafka' @@ -55,13 +53,11 @@ def __init__(self, name, init_config, instances): self._zk_client.start() def check(self, instance): - # For calculating lag, we have to fetch offsets from both kafka and - # zookeeper. There's a potential race condition because whichever one we - # check first may be outdated by the time we check the other. Better to - # check consumer offset before checking broker offset because worst case - # is that overstates consumer lag a little. Doing it the other way can - # understate consumer lag to the point of having negative consumer lag, - # which just creates confusion because it's theoretically impossible. + # For calculating consumer lag, we have to fetch both the consumer offset and the broker highwater offset. + # There's a potential race condition because whichever one we check first may be outdated by the time we check + # the other. Better to check consumer offsets before checking broker offsets because worst case is that + # overstates consumer lag a little. Doing it the other way can understate consumer lag to the point of having + # negative consumer lag, which just creates confusion because it's theoretically impossible. # If monitor_unlisted_consumer_groups is True, fetch all groups stored in ZK consumer_groups = None @@ -82,18 +78,18 @@ def check(self, instance): self._kafka_client._maybe_refresh_metadata() # Fetch consumer group offsets from Kafka + + # For legacy reasons, this only fetches consumer offsets from kafka if zookeeper is omitted or + # kafka_consumer_offsets is True. if is_affirmative(instance.get('kafka_consumer_offsets', self._zk_hosts_ports is None)): # For now, consumer groups are mandatory if not using ZK if self._zk_hosts_ports is None and not consumer_groups: raise ConfigurationError( - 'Invalid configuration - if you are not collecting ' - 'offsets from ZK you _must_ specify consumer groups' + 'Invalid configuration - if you are collecting consumer offsets from Kafka, and your brokers are ' + 'older than 0.10.2, then you _must_ specify consumer groups and their topics. Older brokers lack ' + 'the necessary protocol support to determine which topics a consumer is consuming. See KIP-88 for ' + 'details.' ) - # kafka-python automatically probes the cluster for broker version - # and then stores it. Note that this returns the first version - # found, so in a mixed-version cluster this will be a - # non-deterministic result. - # # Kafka 0.8.2 added support for storing consumer offsets in Kafka. if self._kafka_client.config.get('api_version') >= (0, 8, 2): kafka_consumer_offsets, topics = self._get_kafka_consumer_offsets(instance, consumer_groups) @@ -146,9 +142,12 @@ def _create_kafka_client(self): bootstrap_servers=kafka_conn_str, client_id='dd-agent', request_timeout_ms=self.init_config.get('kafka_timeout', DEFAULT_KAFKA_TIMEOUT) * 1000, + # if `kafka_client_api_version` is not set, then kafka-python automatically probes the cluster for broker + # version during the bootstrapping process. Note that probing randomly picks a broker to probe, so in a + # mixed-version cluster probing returns a non-deterministic result. api_version=self.instance.get('kafka_client_api_version'), - # While we check for SSL params, if not present they will default - # to the kafka-python values for plaintext connections + # While we check for SSL params, if not present they will default to the kafka-python values for plaintext + # connections security_protocol=self.instance.get('security_protocol', 'PLAINTEXT'), sasl_mechanism=self.instance.get('sasl_mechanism'), sasl_plain_username=self.instance.get('sasl_plain_username'), @@ -168,8 +167,7 @@ def _make_blocking_req(self, request, node_id=None): node_id = self._kafka_client.least_loaded_node() while not self._kafka_client.ready(node_id): - # poll until the connection to broker is ready, otherwise send() - # will fail with NodeNotReadyError + # poll until the connection to broker is ready, otherwise send() will fail with NodeNotReadyError self._kafka_client.poll() future = self._kafka_client.send(node_id, request) @@ -222,21 +220,13 @@ def _process_highwater_offsets(self, response): def _get_broker_offsets(self, topics): """ - Fetch highwater offsets for each topic/partition from Kafka cluster. + Fetch highwater offsets for topic_partitions in the Kafka cluster. - Do this for all partitions in the cluster because even if it has no - consumers, we may want to measure whether producers are successfully - producing. No need to limit this for performance because fetching broker - offsets from Kafka is a relatively inexpensive operation. + Do this for all partitions in the cluster because even if it has no consumers, we may want to measure whether + producers are successfully producing. - Sends one OffsetRequest per broker to get offsets for all partitions - where that broker is the leader: + Sends one OffsetRequest per broker to get offsets for all partitions where that broker is the leader: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI(AKAListOffset) - - Can we cleanup connections on agent restart? - Brokers before 0.9 - accumulate stale connections on restarts. - In 0.9 Kafka added connections.max.idle.ms - https://issues.apache.org/jira/browse/KAFKA-1282 """ # Connect to Kafka @@ -292,9 +282,8 @@ def _report_consumer_metrics( ) if (topic, partition) not in unled_topic_partitions: self.log.warn( - "Consumer group: %s has offsets for topic: %s " - "partition: %s, but that topic partition doesn't actually " - "exist in the cluster.", + "Consumer group: %s has offsets for topic: %s partition: %s, but that topic partition doesn't " + "actually exist in the cluster.", consumer_group, topic, partition,