diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py index 76d32d7125de6e..e4c03d0176d717 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py @@ -1,12 +1,136 @@ +from collections import defaultdict + +from kafka import errors as kafka_errors +from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy, OffsetResponse + +from datadog_checks.kafka_consumer.constants import KAFKA_INTERNAL_TOPICS + + class KafkaPythonClient: - def __init__(self) -> None: - pass + def __init__(self, check) -> None: + self.check = check def get_consumer_offset_and_lag(self): pass def get_broker_offset(self): - pass + """Fetch highwater offsets for topic_partitions in the Kafka cluster. + + Do this for all partitions in the cluster because even if it has no consumers, we may want to measure whether + producers are successfully producing. + + If monitor_all_broker_highwatermarks is True, will fetch for all partitions in the cluster. Otherwise highwater + mark offsets will only be fetched for topic partitions where this check run has already fetched a consumer + offset. + + Internal Kafka topics like __consumer_offsets, __transaction_state, etc are always excluded. + + Any partitions that don't currently have a leader will be skipped. + + Sends one OffsetRequest per broker to get offsets for all partitions where that broker is the leader: + https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI(AKAListOffset) + + For speed, all the brokers are queried in parallel using callbacks. The callback flow is: + 1. Issue an OffsetRequest to every broker + 2. Attach a callback to each OffsetResponse that parses the response and saves the highwater offsets. + """ + highwater_futures = [] # No need to store on object because the callbacks don't create additional futures + + # If we aren't fetching all broker highwater offsets, then construct the unique set of topic partitions for + # which this run of the check has at least once saved consumer offset. This is later used as a filter for + # excluding partitions. + if not self.check._monitor_all_broker_highwatermarks: + tps_with_consumer_offset = {(topic, partition) for (_, topic, partition) in self.check._consumer_offsets} + + for batch in self.batchify( + self.check.kafka_client._client.cluster.brokers(), self.check._broker_requests_batch_size + ): + for broker in batch: + broker_led_partitions = self.check.kafka_client._client.cluster.partitions_for_broker(broker.nodeId) + if broker_led_partitions is None: + continue + + # Take the partitions for which this broker is the leader and group them by topic in order to construct + # the OffsetRequest while simultaneously filtering out partitions we want to exclude + partitions_grouped_by_topic = defaultdict(list) + for topic, partition in broker_led_partitions: + # No sense fetching highwater offsets for internal topics + if topic not in KAFKA_INTERNAL_TOPICS and ( + self.check._monitor_all_broker_highwatermarks or (topic, partition) in tps_with_consumer_offset + ): + partitions_grouped_by_topic[topic].append(partition) + + # Construct the OffsetRequest + max_offsets = 1 + request = OffsetRequest[0]( + replica_id=-1, + topics=[ + (topic, [(partition, OffsetResetStrategy.LATEST, max_offsets) for partition in partitions]) + for topic, partitions in partitions_grouped_by_topic.items() + ], + ) + + # We can disable wakeup here because it is the same thread doing both polling and sending. Also, it + # is possible that the wakeup itself could block if a large number of sends were processed beforehand. + highwater_future = self._send_request_to_node(node_id=broker.nodeId, request=request, wakeup=False) + + highwater_future.add_callback(self._highwater_offsets_callback) + highwater_futures.append(highwater_future) + + # Loop until all futures resolved. + self.check.kafka_client._wait_for_futures(highwater_futures) + + def _highwater_offsets_callback(self, response): + """Callback that parses an OffsetFetchResponse and saves it to the highwater_offsets dict.""" + if type(response) not in OffsetResponse: + raise RuntimeError("response type should be OffsetResponse, but instead was %s." % type(response)) + for topic, partitions_data in response.topics: + for partition, error_code, offsets in partitions_data: + error_type = kafka_errors.for_code(error_code) + if error_type is kafka_errors.NoError: + self.check._highwater_offsets[(topic, partition)] = offsets[0] + elif error_type is kafka_errors.NotLeaderForPartitionError: + self.check.log.warning( + "Kafka broker returned %s (error_code %s) for topic %s, partition: %s. This should only happen " + "if the broker that was the partition leader when kafka_admin_client last fetched metadata is " + "no longer the leader.", + error_type.message, + error_type.errno, + topic, + partition, + ) + self.check.kafka_client._client.cluster.request_update() # force metadata update on next poll() + elif error_type is kafka_errors.UnknownTopicOrPartitionError: + self.check.log.warning( + "Kafka broker returned %s (error_code %s) for topic: %s, partition: %s. This should only " + "happen if the topic is currently being deleted or the check configuration lists non-existent " + "topic partitions.", + error_type.message, + error_type.errno, + topic, + partition, + ) + else: + raise error_type( + "Unexpected error encountered while attempting to fetch the highwater offsets for topic: %s, " + "partition: %s." % (topic, partition) + ) + + @staticmethod + def batchify(iterable, batch_size): + iterable = list(iterable) + return (iterable[i : i + batch_size] for i in range(0, len(iterable), batch_size)) + + # FIXME: This is using a workaround to skip socket wakeup, which causes blocking + # (see https://github.com/dpkp/kafka-python/issues/2286). + # Once https://github.com/dpkp/kafka-python/pull/2335 is merged in, we can use the official + # implementation for this function instead. + def _send_request_to_node(self, node_id, request, wakeup=True): + while not self.check.kafka_client._client.ready(node_id): + # poll until the connection to broker is ready, otherwise send() + # will fail with NodeNotReadyError + self.check.kafka_client._client.poll() + return self.check.kafka_client._client.send(node_id, request, wakeup=wakeup) def report_consumer_offset_and_lag(self): pass