diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py index b6f8dbf248cb5..bd990141608c6 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py @@ -5,7 +5,7 @@ class ConfluentKafkaClient: def __init__(self) -> None: pass - def get_consumer_offset_and_lag(self): + def get_consumer_offsets(self): pass def get_broker_offset(self): @@ -16,3 +16,6 @@ def report_consumer_offset_and_lag(self): def report_broker_offset(self): pass + + def collect_broker_metadata(self): + pass diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py index 1398feebd71b1..6ad5525f1b150 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py @@ -5,7 +5,7 @@ class KafkaClient: def __init__(self) -> None: pass - def get_consumer_offset_and_lag(self): + def get_consumer_offsets(self): pass def get_broker_offset(self): @@ -16,3 +16,6 @@ def report_consumer_offset_and_lag(self): def report_broker_offset(self): pass + + def collect_broker_metadata(self): + pass diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py index 2398f805b4d61..0f2299bff4d9b 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py @@ -4,13 +4,15 @@ from collections import defaultdict from time import time -import six +from kafka import errors as kafka_errors from kafka.protocol.admin import ListGroupsRequest from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest +from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy, OffsetResponse from kafka.structs import TopicPartition -from six import string_types +from six import iteritems, string_types -from datadog_checks.base import ConfigurationError +from datadog_checks.base import AgentCheck, ConfigurationError +from datadog_checks.kafka_consumer.constants import KAFKA_INTERNAL_TOPICS class KafkaPythonClient: @@ -19,17 +21,158 @@ def __init__(self, check) -> None: self.log = check.log self.kafka_client = check.kafka_client + @AgentCheck.metadata_entrypoint + def collect_broker_metadata(self): + return self._collect_broker_metadata + + def _collect_broker_metadata(self): + version_data = [str(part) for part in self.kafka_client._client.check_version()] + version_parts = {name: part for name, part in zip(('major', 'minor', 'patch'), version_data)} + + self.set_metadata( + 'version', '.'.join(version_data), scheme='parts', final_scheme='semver', part_map=version_parts + ) + def get_consumer_offsets(self): return self._get_consumer_offsets def get_broker_offset(self): - pass + return self._get_broker_offset + + def _get_broker_offset(self): + """Fetch highwater offsets for topic_partitions in the Kafka cluster. + + Do this for all partitions in the cluster because even if it has no consumers, we may want to measure whether + producers are successfully producing. + + If monitor_all_broker_highwatermarks is True, will fetch for all partitions in the cluster. Otherwise highwater + mark offsets will only be fetched for topic partitions where this check run has already fetched a consumer + offset. + + Internal Kafka topics like __consumer_offsets, __transaction_state, etc are always excluded. + + Any partitions that don't currently have a leader will be skipped. + + Sends one OffsetRequest per broker to get offsets for all partitions where that broker is the leader: + https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI(AKAListOffset) + + For speed, all the brokers are queried in parallel using callbacks. The callback flow is: + 1. Issue an OffsetRequest to every broker + 2. Attach a callback to each OffsetResponse that parses the response and saves the highwater offsets. + """ + highwater_futures = [] # No need to store on object because the callbacks don't create additional futures + + # If we aren't fetching all broker highwater offsets, then construct the unique set of topic partitions for + # which this run of the check has at least once saved consumer offset. This is later used as a filter for + # excluding partitions. + if not self.check._monitor_all_broker_highwatermarks: + tps_with_consumer_offset = {(topic, partition) for (_, topic, partition) in self.check._consumer_offsets} + + for batch in self.batchify(self.kafka_client._client.cluster.brokers(), self.check._broker_requests_batch_size): + for broker in batch: + broker_led_partitions = self.kafka_client._client.cluster.partitions_for_broker(broker.nodeId) + if broker_led_partitions is None: + continue + + # Take the partitions for which this broker is the leader and group them by topic in order to construct + # the OffsetRequest while simultaneously filtering out partitions we want to exclude + partitions_grouped_by_topic = defaultdict(list) + for topic, partition in broker_led_partitions: + # No sense fetching highwater offsets for internal topics + if topic not in KAFKA_INTERNAL_TOPICS and ( + self.check._monitor_all_broker_highwatermarks or (topic, partition) in tps_with_consumer_offset + ): + partitions_grouped_by_topic[topic].append(partition) + + # Construct the OffsetRequest + max_offsets = 1 + request = OffsetRequest[0]( + replica_id=-1, + topics=[ + (topic, [(partition, OffsetResetStrategy.LATEST, max_offsets) for partition in partitions]) + for topic, partitions in partitions_grouped_by_topic.items() + ], + ) + + # We can disable wakeup here because it is the same thread doing both polling and sending. Also, it + # is possible that the wakeup itself could block if a large number of sends were processed beforehand. + highwater_future = self._send_request_to_node(node_id=broker.nodeId, request=request, wakeup=False) + + highwater_future.add_callback(self._highwater_offsets_callback) + highwater_futures.append(highwater_future) + + # Loop until all futures resolved. + self.kafka_client._wait_for_futures(highwater_futures) + + def _highwater_offsets_callback(self, response): + """Callback that parses an OffsetFetchResponse and saves it to the highwater_offsets dict.""" + if type(response) not in OffsetResponse: + raise RuntimeError("response type should be OffsetResponse, but instead was %s." % type(response)) + for topic, partitions_data in response.topics: + for partition, error_code, offsets in partitions_data: + error_type = kafka_errors.for_code(error_code) + if error_type is kafka_errors.NoError: + self.check._highwater_offsets[(topic, partition)] = offsets[0] + elif error_type is kafka_errors.NotLeaderForPartitionError: + self.log.warning( + "Kafka broker returned %s (error_code %s) for topic %s, partition: %s. This should only happen " + "if the broker that was the partition leader when kafka_admin_client last fetched metadata is " + "no longer the leader.", + error_type.message, + error_type.errno, + topic, + partition, + ) + self.kafka_client._client.cluster.request_update() # force metadata update on next poll() + elif error_type is kafka_errors.UnknownTopicOrPartitionError: + self.log.warning( + "Kafka broker returned %s (error_code %s) for topic: %s, partition: %s. This should only " + "happen if the topic is currently being deleted or the check configuration lists non-existent " + "topic partitions.", + error_type.message, + error_type.errno, + topic, + partition, + ) + else: + raise error_type( + "Unexpected error encountered while attempting to fetch the highwater offsets for topic: %s, " + "partition: %s." % (topic, partition) + ) + + @staticmethod + def batchify(iterable, batch_size): + iterable = list(iterable) + return (iterable[i : i + batch_size] for i in range(0, len(iterable), batch_size)) + + # FIXME: This is using a workaround to skip socket wakeup, which causes blocking + # (see https://github.com/dpkp/kafka-python/issues/2286). + # Once https://github.com/dpkp/kafka-python/pull/2335 is merged in, we can use the official + # implementation for this function instead. + def _send_request_to_node(self, node_id, request, wakeup=True): + while not self.kafka_client._client.ready(node_id): + # poll until the connection to broker is ready, otherwise send() + # will fail with NodeNotReadyError + self.kafka_client._client.poll() + return self.kafka_client._client.send(node_id, request, wakeup=wakeup) def report_consumer_offsets_and_lag(self): return self._report_consumer_offsets_and_lag - def report_broker_offset(self): - pass + def report_broker_offset(self, contexts_limit): + return self._report_broker_offset + + def _report_broker_offset(self, contexts_limit): + """Report the broker highwater offsets.""" + reported_contexts = 0 + self.log.debug("Reporting broker offset metric") + for (topic, partition), highwater_offset in self._highwater_offsets.items(): + broker_tags = ['topic:%s' % topic, 'partition:%s' % partition] + broker_tags.extend(self.check._custom_tags) + self.check.gauge('broker_offset', highwater_offset, tags=broker_tags) + reported_contexts += 1 + if reported_contexts == contexts_limit: + return def _validate_consumer_groups(self): """Validate any explicitly specified consumer groups. @@ -230,7 +373,7 @@ def _list_consumer_groups_send_request(self, broker_id): "Support for ListGroupsRequest_v{} has not yet been added to KafkaAdminClient.".format(kafka_version) ) # Disable wakeup when sending request to prevent blocking send requests - return self.check._send_request_to_node(broker_id, request, wakeup=False) + return self._send_request_to_node(broker_id, request, wakeup=False) def _find_coordinator_id_send_request(self, group_id): """Send a FindCoordinatorRequest to a broker. @@ -240,7 +383,7 @@ def _find_coordinator_id_send_request(self, group_id): """ version = 0 request = GroupCoordinatorRequest[version](group_id) - return self.check._send_request_to_node(self.kafka_client._client.least_loaded_node(), request, wakeup=False) + return self._send_request_to_node(self.kafka_client._client.least_loaded_node(), request, wakeup=False) def _list_consumer_group_offsets_send_request(self, group_id, group_coordinator_id, partitions=None): """Send an OffsetFetchRequest to a broker. @@ -267,13 +410,13 @@ def _list_consumer_group_offsets_send_request(self, group_id, group_coordinator_ topics_partitions_dict = defaultdict(set) for topic, partition in partitions: topics_partitions_dict[topic].add(partition) - topics_partitions = list(six.iteritems(topics_partitions_dict)) + topics_partitions = list(iteritems(topics_partitions_dict)) request = OffsetFetchRequest[version](group_id, topics_partitions) else: raise NotImplementedError( "Support for OffsetFetchRequest_v{} has not yet been added to KafkaAdminClient.".format(version) ) - return self.check._send_request_to_node(group_coordinator_id, request, wakeup=False) + return self._send_request_to_node(group_coordinator_id, request, wakeup=False) def _send_event(self, title, text, tags, event_type, aggregation_key, severity='info'): """Emit an event to the Datadog Event Stream."""