Skip to content

Commit

Permalink
Add broker offset metric collection
Browse files Browse the repository at this point in the history
  • Loading branch information
yzhan289 committed Feb 10, 2023
1 parent d056efd commit 6a038f7
Showing 1 changed file with 127 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,136 @@
from collections import defaultdict

from kafka import errors as kafka_errors
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy, OffsetResponse

from datadog_checks.kafka_consumer.constants import KAFKA_INTERNAL_TOPICS


class KafkaPythonClient:
def __init__(self) -> None:
pass
def __init__(self, check) -> None:
self.check = check

def get_consumer_offset_and_lag(self):
pass

def get_broker_offset(self):
pass
"""Fetch highwater offsets for topic_partitions in the Kafka cluster.
Do this for all partitions in the cluster because even if it has no consumers, we may want to measure whether
producers are successfully producing.
If monitor_all_broker_highwatermarks is True, will fetch for all partitions in the cluster. Otherwise highwater
mark offsets will only be fetched for topic partitions where this check run has already fetched a consumer
offset.
Internal Kafka topics like __consumer_offsets, __transaction_state, etc are always excluded.
Any partitions that don't currently have a leader will be skipped.
Sends one OffsetRequest per broker to get offsets for all partitions where that broker is the leader:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI(AKAListOffset)
For speed, all the brokers are queried in parallel using callbacks. The callback flow is:
1. Issue an OffsetRequest to every broker
2. Attach a callback to each OffsetResponse that parses the response and saves the highwater offsets.
"""
highwater_futures = [] # No need to store on object because the callbacks don't create additional futures

# If we aren't fetching all broker highwater offsets, then construct the unique set of topic partitions for
# which this run of the check has at least once saved consumer offset. This is later used as a filter for
# excluding partitions.
if not self.check._monitor_all_broker_highwatermarks:
tps_with_consumer_offset = {(topic, partition) for (_, topic, partition) in self.check._consumer_offsets}

for batch in self.batchify(
self.check.kafka_client._client.cluster.brokers(), self.check._broker_requests_batch_size
):
for broker in batch:
broker_led_partitions = self.check.kafka_client._client.cluster.partitions_for_broker(broker.nodeId)
if broker_led_partitions is None:
continue

# Take the partitions for which this broker is the leader and group them by topic in order to construct
# the OffsetRequest while simultaneously filtering out partitions we want to exclude
partitions_grouped_by_topic = defaultdict(list)
for topic, partition in broker_led_partitions:
# No sense fetching highwater offsets for internal topics
if topic not in KAFKA_INTERNAL_TOPICS and (
self.check._monitor_all_broker_highwatermarks or (topic, partition) in tps_with_consumer_offset
):
partitions_grouped_by_topic[topic].append(partition)

# Construct the OffsetRequest
max_offsets = 1
request = OffsetRequest[0](
replica_id=-1,
topics=[
(topic, [(partition, OffsetResetStrategy.LATEST, max_offsets) for partition in partitions])
for topic, partitions in partitions_grouped_by_topic.items()
],
)

# We can disable wakeup here because it is the same thread doing both polling and sending. Also, it
# is possible that the wakeup itself could block if a large number of sends were processed beforehand.
highwater_future = self._send_request_to_node(node_id=broker.nodeId, request=request, wakeup=False)

highwater_future.add_callback(self._highwater_offsets_callback)
highwater_futures.append(highwater_future)

# Loop until all futures resolved.
self.check.kafka_client._wait_for_futures(highwater_futures)

def _highwater_offsets_callback(self, response):
"""Callback that parses an OffsetFetchResponse and saves it to the highwater_offsets dict."""
if type(response) not in OffsetResponse:
raise RuntimeError("response type should be OffsetResponse, but instead was %s." % type(response))
for topic, partitions_data in response.topics:
for partition, error_code, offsets in partitions_data:
error_type = kafka_errors.for_code(error_code)
if error_type is kafka_errors.NoError:
self.check._highwater_offsets[(topic, partition)] = offsets[0]
elif error_type is kafka_errors.NotLeaderForPartitionError:
self.check.log.warning(
"Kafka broker returned %s (error_code %s) for topic %s, partition: %s. This should only happen "
"if the broker that was the partition leader when kafka_admin_client last fetched metadata is "
"no longer the leader.",
error_type.message,
error_type.errno,
topic,
partition,
)
self.check.kafka_client._client.cluster.request_update() # force metadata update on next poll()
elif error_type is kafka_errors.UnknownTopicOrPartitionError:
self.check.log.warning(
"Kafka broker returned %s (error_code %s) for topic: %s, partition: %s. This should only "
"happen if the topic is currently being deleted or the check configuration lists non-existent "
"topic partitions.",
error_type.message,
error_type.errno,
topic,
partition,
)
else:
raise error_type(
"Unexpected error encountered while attempting to fetch the highwater offsets for topic: %s, "
"partition: %s." % (topic, partition)
)

@staticmethod
def batchify(iterable, batch_size):
iterable = list(iterable)
return (iterable[i : i + batch_size] for i in range(0, len(iterable), batch_size))

# FIXME: This is using a workaround to skip socket wakeup, which causes blocking
# (see https://github.com/dpkp/kafka-python/issues/2286).
# Once https://github.com/dpkp/kafka-python/pull/2335 is merged in, we can use the official
# implementation for this function instead.
def _send_request_to_node(self, node_id, request, wakeup=True):
while not self.check.kafka_client._client.ready(node_id):
# poll until the connection to broker is ready, otherwise send()
# will fail with NodeNotReadyError
self.check.kafka_client._client.poll()
return self.check.kafka_client._client.send(node_id, request, wakeup=wakeup)

def report_consumer_offset_and_lag(self):
pass
Expand Down

0 comments on commit 6a038f7

Please sign in to comment.