Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor broker offset metric collection #13934

Merged
merged 6 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ class ConfluentKafkaClient:
def __init__(self) -> None:
pass

def get_consumer_offset_and_lag(self):
def get_consumer_offsets(self):
pass

def get_broker_offset(self):
Expand All @@ -16,3 +16,6 @@ def report_consumer_offset_and_lag(self):

def report_broker_offset(self):
pass

def collect_broker_metadata(self):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ class KafkaClient:
def __init__(self) -> None:
pass

def get_consumer_offset_and_lag(self):
def get_consumer_offsets(self):
pass

def get_broker_offset(self):
Expand All @@ -16,3 +16,6 @@ def report_consumer_offset_and_lag(self):

def report_broker_offset(self):
pass

def collect_broker_metadata(self):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
from collections import defaultdict
from time import time

import six
from kafka import errors as kafka_errors
from kafka.protocol.admin import ListGroupsRequest
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy, OffsetResponse
from kafka.structs import TopicPartition
from six import string_types
from six import iteritems, string_types

from datadog_checks.base import ConfigurationError
from datadog_checks.base import AgentCheck, ConfigurationError
from datadog_checks.kafka_consumer.constants import KAFKA_INTERNAL_TOPICS


class KafkaPythonClient:
Expand All @@ -19,17 +21,158 @@ def __init__(self, check) -> None:
self.log = check.log
self.kafka_client = check.kafka_client

@AgentCheck.metadata_entrypoint
def collect_broker_metadata(self):
return self._collect_broker_metadata

def _collect_broker_metadata(self):
version_data = [str(part) for part in self.kafka_client._client.check_version()]
version_parts = {name: part for name, part in zip(('major', 'minor', 'patch'), version_data)}

self.set_metadata(
'version', '.'.join(version_data), scheme='parts', final_scheme='semver', part_map=version_parts
)

def get_consumer_offsets(self):
return self._get_consumer_offsets

def get_broker_offset(self):
pass
return self._get_broker_offset

def _get_broker_offset(self):
"""Fetch highwater offsets for topic_partitions in the Kafka cluster.
yzhan289 marked this conversation as resolved.
Show resolved Hide resolved

Do this for all partitions in the cluster because even if it has no consumers, we may want to measure whether
producers are successfully producing.

If monitor_all_broker_highwatermarks is True, will fetch for all partitions in the cluster. Otherwise highwater
mark offsets will only be fetched for topic partitions where this check run has already fetched a consumer
offset.

Internal Kafka topics like __consumer_offsets, __transaction_state, etc are always excluded.

Any partitions that don't currently have a leader will be skipped.

Sends one OffsetRequest per broker to get offsets for all partitions where that broker is the leader:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI(AKAListOffset)

For speed, all the brokers are queried in parallel using callbacks. The callback flow is:
1. Issue an OffsetRequest to every broker
2. Attach a callback to each OffsetResponse that parses the response and saves the highwater offsets.
"""
highwater_futures = [] # No need to store on object because the callbacks don't create additional futures

# If we aren't fetching all broker highwater offsets, then construct the unique set of topic partitions for
# which this run of the check has at least once saved consumer offset. This is later used as a filter for
# excluding partitions.
if not self.check._monitor_all_broker_highwatermarks:
tps_with_consumer_offset = {(topic, partition) for (_, topic, partition) in self.check._consumer_offsets}

for batch in self.batchify(self.kafka_client._client.cluster.brokers(), self.check._broker_requests_batch_size):
for broker in batch:
broker_led_partitions = self.kafka_client._client.cluster.partitions_for_broker(broker.nodeId)
if broker_led_partitions is None:
continue

# Take the partitions for which this broker is the leader and group them by topic in order to construct
# the OffsetRequest while simultaneously filtering out partitions we want to exclude
partitions_grouped_by_topic = defaultdict(list)
for topic, partition in broker_led_partitions:
# No sense fetching highwater offsets for internal topics
if topic not in KAFKA_INTERNAL_TOPICS and (
self.check._monitor_all_broker_highwatermarks or (topic, partition) in tps_with_consumer_offset
):
partitions_grouped_by_topic[topic].append(partition)

# Construct the OffsetRequest
max_offsets = 1
request = OffsetRequest[0](
replica_id=-1,
topics=[
(topic, [(partition, OffsetResetStrategy.LATEST, max_offsets) for partition in partitions])
for topic, partitions in partitions_grouped_by_topic.items()
],
)

# We can disable wakeup here because it is the same thread doing both polling and sending. Also, it
# is possible that the wakeup itself could block if a large number of sends were processed beforehand.
highwater_future = self._send_request_to_node(node_id=broker.nodeId, request=request, wakeup=False)

highwater_future.add_callback(self._highwater_offsets_callback)
highwater_futures.append(highwater_future)

# Loop until all futures resolved.
self.kafka_client._wait_for_futures(highwater_futures)

def _highwater_offsets_callback(self, response):
"""Callback that parses an OffsetFetchResponse and saves it to the highwater_offsets dict."""
if type(response) not in OffsetResponse:
raise RuntimeError("response type should be OffsetResponse, but instead was %s." % type(response))
for topic, partitions_data in response.topics:
for partition, error_code, offsets in partitions_data:
error_type = kafka_errors.for_code(error_code)
if error_type is kafka_errors.NoError:
self.check._highwater_offsets[(topic, partition)] = offsets[0]
elif error_type is kafka_errors.NotLeaderForPartitionError:
self.log.warning(
"Kafka broker returned %s (error_code %s) for topic %s, partition: %s. This should only happen "
"if the broker that was the partition leader when kafka_admin_client last fetched metadata is "
"no longer the leader.",
error_type.message,
error_type.errno,
topic,
partition,
)
self.kafka_client._client.cluster.request_update() # force metadata update on next poll()
elif error_type is kafka_errors.UnknownTopicOrPartitionError:
self.log.warning(
"Kafka broker returned %s (error_code %s) for topic: %s, partition: %s. This should only "
"happen if the topic is currently being deleted or the check configuration lists non-existent "
"topic partitions.",
error_type.message,
error_type.errno,
topic,
partition,
)
else:
raise error_type(
"Unexpected error encountered while attempting to fetch the highwater offsets for topic: %s, "
"partition: %s." % (topic, partition)
)

@staticmethod
def batchify(iterable, batch_size):
iterable = list(iterable)
return (iterable[i : i + batch_size] for i in range(0, len(iterable), batch_size))

# FIXME: This is using a workaround to skip socket wakeup, which causes blocking
# (see https://github.com/dpkp/kafka-python/issues/2286).
# Once https://github.com/dpkp/kafka-python/pull/2335 is merged in, we can use the official
# implementation for this function instead.
def _send_request_to_node(self, node_id, request, wakeup=True):
while not self.kafka_client._client.ready(node_id):
# poll until the connection to broker is ready, otherwise send()
# will fail with NodeNotReadyError
self.kafka_client._client.poll()
return self.kafka_client._client.send(node_id, request, wakeup=wakeup)

def report_consumer_offsets_and_lag(self):
return self._report_consumer_offsets_and_lag

def report_broker_offset(self):
pass
def report_broker_offset(self, contexts_limit):
return self._report_broker_offset

def _report_broker_offset(self, contexts_limit):
"""Report the broker highwater offsets."""
reported_contexts = 0
self.log.debug("Reporting broker offset metric")
for (topic, partition), highwater_offset in self._highwater_offsets.items():
broker_tags = ['topic:%s' % topic, 'partition:%s' % partition]
broker_tags.extend(self.check._custom_tags)
self.check.gauge('broker_offset', highwater_offset, tags=broker_tags)
reported_contexts += 1
if reported_contexts == contexts_limit:
return

def _validate_consumer_groups(self):
"""Validate any explicitly specified consumer groups.
Expand Down Expand Up @@ -230,7 +373,7 @@ def _list_consumer_groups_send_request(self, broker_id):
"Support for ListGroupsRequest_v{} has not yet been added to KafkaAdminClient.".format(kafka_version)
)
# Disable wakeup when sending request to prevent blocking send requests
return self.check._send_request_to_node(broker_id, request, wakeup=False)
return self._send_request_to_node(broker_id, request, wakeup=False)

def _find_coordinator_id_send_request(self, group_id):
"""Send a FindCoordinatorRequest to a broker.
Expand All @@ -240,7 +383,7 @@ def _find_coordinator_id_send_request(self, group_id):
"""
version = 0
request = GroupCoordinatorRequest[version](group_id)
return self.check._send_request_to_node(self.kafka_client._client.least_loaded_node(), request, wakeup=False)
return self._send_request_to_node(self.kafka_client._client.least_loaded_node(), request, wakeup=False)

def _list_consumer_group_offsets_send_request(self, group_id, group_coordinator_id, partitions=None):
"""Send an OffsetFetchRequest to a broker.
Expand All @@ -267,13 +410,13 @@ def _list_consumer_group_offsets_send_request(self, group_id, group_coordinator_
topics_partitions_dict = defaultdict(set)
for topic, partition in partitions:
topics_partitions_dict[topic].add(partition)
topics_partitions = list(six.iteritems(topics_partitions_dict))
topics_partitions = list(iteritems(topics_partitions_dict))
request = OffsetFetchRequest[version](group_id, topics_partitions)
else:
raise NotImplementedError(
"Support for OffsetFetchRequest_v{} has not yet been added to KafkaAdminClient.".format(version)
)
return self.check._send_request_to_node(group_coordinator_id, request, wakeup=False)
return self._send_request_to_node(group_coordinator_id, request, wakeup=False)

def _send_event(self, title, text, tags, event_type, aggregation_key, severity='info'):
"""Emit an event to the Datadog Event Stream."""
Expand Down