Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the get_dict methods from the clients #14149

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Remove the get_dict methods from the clients
  • Loading branch information
FlorentClarret committed Mar 15, 2023
commit 280f25a7699f3f38431ac7495e08854294a5fa81
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@ def kafka_client(self):
)
return self._kafka_client

def get_consumer_offsets_dict(self):
return self._consumer_offsets

def get_highwater_offsets(self, consumer_offsets):
# TODO: Remove broker_requests_batch_size as config after
# kafka-python is removed if we don't need to batch requests in Confluent
highwater_offsets = {}
topics_with_consumer_offset = {}
if not self.config._monitor_all_broker_highwatermarks:
topics_with_consumer_offset = {(topic, partition) for (_, topic, partition) in consumer_offsets}
Expand All @@ -55,14 +53,9 @@ def get_highwater_offsets(self, consumer_offsets):
):
_, high_offset = consumer.get_watermark_offsets(topic_partition)

self._highwater_offsets[(topic, partition)] = high_offset

def get_highwater_offsets_dict(self):
return self._highwater_offsets
highwater_offsets[(topic, partition)] = high_offset

def reset_offsets(self):
self._consumer_offsets = {}
self._highwater_offsets = {}
return highwater_offsets

def get_partitions_for_topic(self, topic):
try:
Expand All @@ -81,6 +74,7 @@ def request_metadata_update(self):
def get_consumer_offsets(self):
# {(consumer_group, topic, partition): offset}
offset_futures = []
consumer_offsets = {}

if self.config._monitor_unlisted_consumer_groups:
# Get all consumer groups
Expand Down Expand Up @@ -142,10 +136,12 @@ def get_consumer_offsets(self):
topic_partition.topic,
str(topic_partition.partition),
)
self._consumer_offsets[(consumer_group, topic, partition)] = offset
consumer_offsets[(consumer_group, topic, partition)] = offset
except KafkaException as e:
self.log.debug("Failed to read consumer offsets for %s: %s", consumer_group, e)

return consumer_offsets

def _validate_consumer_groups(self):
"""Validate any explicitly specified consumer groups.
consumer_groups = {'consumer_group': {'topic': [0, 1]}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,6 @@ def get_highwater_offsets(self, consumer_offsets):
return self.python_kafka_client.get_highwater_offsets(consumer_offsets)
return self.confluent_kafka_client.get_highwater_offsets(consumer_offsets)

def get_highwater_offsets_dict(self):
if self.use_legacy_client:
return self.python_kafka_client.get_highwater_offsets_dict()
return self.confluent_kafka_client.get_highwater_offsets_dict()

def reset_offsets(self):
if self.use_legacy_client:
return self.python_kafka_client.reset_offsets()
return self.confluent_kafka_client.reset_offsets()

def get_partitions_for_topic(self, topic):
if self.use_legacy_client:
return self.python_kafka_client.get_partitions_for_topic(topic)
Expand All @@ -45,8 +35,3 @@ def request_metadata_update(self):
if self.use_legacy_client:
return self.python_kafka_client.request_metadata_update()
return self.confluent_kafka_client.request_metadata_update()

def get_consumer_offsets_dict(self):
if self.use_legacy_client:
return self.python_kafka_client.get_consumer_offsets_dict()
return self.confluent_kafka_client.get_consumer_offsets_dict()
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,16 @@ def __init__(self, config, tls_context, log) -> None:
self.config = config
self.log = log
self._kafka_client = None
self._highwater_offsets = {}
self._consumer_offsets = {}
self._tls_context = tls_context

@abstractmethod
def get_consumer_offsets(self):
pass

@abstractmethod
def get_consumer_offsets_dict(self):
pass

@abstractmethod
def get_highwater_offsets(self):
pass

@abstractmethod
def get_highwater_offsets_dict(self):
pass

@abstractmethod
def reset_offsets(self):
pass

@abstractmethod
def get_partitions_for_topic(self, topic):
pass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def get_consumer_offsets(self):
# Store the list of futures on the object because some of the callbacks create/store additional futures and they
# don't have access to variables scoped to this method, only to the object scope
self._consumer_futures = []
consumer_offsets = {}

if self.config._monitor_unlisted_consumer_groups:
for broker in self.kafka_client._client.cluster.brokers():
Expand All @@ -59,13 +60,13 @@ def get_consumer_offsets(self):
# Once https://github.com/dpkp/kafka-python/pull/2335 is merged in, we can use the official
# implementation for this function instead.
list_groups_future = self._list_consumer_groups_send_request(broker.nodeId)
list_groups_future.add_callback(self._list_groups_callback, broker.nodeId)
list_groups_future.add_callback(self._list_groups_callback, broker.nodeId, consumer_offsets)
self._consumer_futures.append(list_groups_future)
elif self.config._consumer_groups:
self._validate_consumer_groups()
for consumer_group in self.config._consumer_groups:
find_coordinator_future = self._find_coordinator_id_send_request(consumer_group)
find_coordinator_future.add_callback(self._find_coordinator_callback, consumer_group)
find_coordinator_future.add_callback(self._find_coordinator_callback, consumer_group, consumer_offsets)
self._consumer_futures.append(find_coordinator_future)
else:
raise ConfigurationError(
Expand All @@ -77,6 +78,8 @@ def get_consumer_offsets(self):
self.kafka_client._wait_for_futures(self._consumer_futures)
del self._consumer_futures # since it's reset on every check run, no sense holding the reference between runs

return consumer_offsets

def get_highwater_offsets(self, consumer_offsets):
"""Fetch highwater offsets for topic_partitions in the Kafka cluster.

Expand All @@ -103,6 +106,9 @@ def get_highwater_offsets(self, consumer_offsets):
# If we aren't fetching all broker highwater offsets, then construct the unique set of topic partitions for
# which this run of the check has at least once saved consumer offset. This is later used as a filter for
# excluding partitions.

highwater_offsets = {}

if not self.config._monitor_all_broker_highwatermarks:
tps_with_consumer_offset = {(topic, partition) for (_, topic, partition) in consumer_offsets}

Expand Down Expand Up @@ -138,12 +144,14 @@ def get_highwater_offsets(self, consumer_offsets):
# is possible that the wakeup itself could block if a large number of sends were processed beforehand.
highwater_future = self._send_request_to_node(node_id=broker.nodeId, request=request, wakeup=False)

highwater_future.add_callback(self._highwater_offsets_callback)
highwater_future.add_callback(self._highwater_offsets_callback, highwater_offsets)
highwater_futures.append(highwater_future)

# Loop until all futures resolved.
self.kafka_client._wait_for_futures(highwater_futures)

return highwater_offsets

def create_kafka_admin_client(self):
crlfile = self.config._crlfile
if crlfile:
Expand Down Expand Up @@ -198,15 +206,15 @@ def kafka_client(self):
self._kafka_client = self._create_kafka_admin_client(api_version=kafka_version)
return self._kafka_client

def _highwater_offsets_callback(self, response):
def _highwater_offsets_callback(self, highwater_offsets, response):
"""Callback that parses an OffsetFetchResponse and saves it to the highwater_offsets dict."""
if type(response) not in OffsetResponse:
raise RuntimeError("response type should be OffsetResponse, but instead was %s." % type(response))
for topic, partitions_data in response.topics:
for partition, error_code, offsets in partitions_data:
error_type = kafka_errors.for_code(error_code)
if error_type is kafka_errors.NoError:
self._highwater_offsets[(topic, partition)] = offsets[0]
highwater_offsets[(topic, partition)] = offsets[0]
elif error_type is kafka_errors.NotLeaderForPartitionError:
self.log.warning(
"Kafka broker returned %s (error_code %s) for topic %s, partition: %s. This should only happen "
Expand Down Expand Up @@ -267,7 +275,7 @@ def _validate_consumer_groups(self):
for partition in partitions:
assert isinstance(partition, int)

def _list_groups_callback(self, broker_id, response):
def _list_groups_callback(self, broker_id, consumer_offsets, response):
"""Callback that takes a ListGroupsResponse and issues an OffsetFetchRequest for each group.

broker_id must be manually passed in because it is not present in the response. Keeping track of the broker that
Expand All @@ -281,10 +289,12 @@ def _list_groups_callback(self, broker_id, response):
single_group_offsets_future = self._list_consumer_group_offsets_send_request(
group_id=consumer_group, group_coordinator_id=broker_id
)
single_group_offsets_future.add_callback(self._single_group_offsets_callback, consumer_group)
single_group_offsets_future.add_callback(
self._single_group_offsets_callback, consumer_group, consumer_offsets
)
self._consumer_futures.append(single_group_offsets_future)

def _find_coordinator_callback(self, consumer_group, response):
def _find_coordinator_callback(self, consumer_group, consumer_offsets, response):
"""Callback that takes a FindCoordinatorResponse and issues an OffsetFetchRequest for the group.

consumer_group must be manually passed in because it is not present in the response, but we need it in order to
Expand All @@ -308,10 +318,10 @@ def _find_coordinator_callback(self, consumer_group, response):
single_group_offsets_future = self._list_consumer_group_offsets_send_request(
group_id=consumer_group, group_coordinator_id=coordinator_id, partitions=topic_partitions
)
single_group_offsets_future.add_callback(self._single_group_offsets_callback, consumer_group)
single_group_offsets_future.add_callback(self._single_group_offsets_callback, consumer_group, consumer_offsets)
self._consumer_futures.append(single_group_offsets_future)

def _single_group_offsets_callback(self, consumer_group, response):
def _single_group_offsets_callback(self, consumer_group, consumer_offsets, response):
"""Callback that parses an OffsetFetchResponse and saves it to the consumer_offsets dict.

consumer_group must be manually passed in because it is not present in the response, but we need it in order to
Expand All @@ -327,7 +337,7 @@ def _single_group_offsets_callback(self, consumer_group, response):
self.kafka_client._client.cluster.request_update() # force metadata update on next poll()
continue
key = (consumer_group, topic, partition)
self._consumer_offsets[key] = offset
consumer_offsets[key] = offset

def _list_consumer_groups_send_request(self, broker_id):
kafka_version = self.kafka_client._matching_api_version(ListGroupsRequest)
Expand Down Expand Up @@ -383,18 +393,8 @@ def _list_consumer_group_offsets_send_request(self, group_id, group_coordinator_
)
return self._send_request_to_node(group_coordinator_id, request, wakeup=False)

def get_highwater_offsets_dict(self):
return self._highwater_offsets

def get_consumer_offsets_dict(self):
return self._consumer_offsets

def get_partitions_for_topic(self, topic):
return self.kafka_client._client.cluster.partitions_for_topic(topic)

def request_metadata_update(self):
self.kafka_client._client.cluster.request_update()

def reset_offsets(self):
self._consumer_offsets = {}
self._highwater_offsets = {}
18 changes: 7 additions & 11 deletions kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,33 +31,29 @@ def __init__(self, name, init_config, instances):
def check(self, _):
"""The main entrypoint of the check."""
# Fetch Kafka consumer offsets
self.client.reset_offsets()

consumer_offsets = {}

try:
self.client.get_consumer_offsets()
# Fetch consumer offsets
# Expected format: {(consumer_group, topic, partition): offset}
consumer_offsets = self.client.get_consumer_offsets()
except Exception:
self.log.exception("There was a problem collecting consumer offsets from Kafka.")
# don't raise because we might get valid broker offsets

# Fetch consumer offsets
# Expected format: {(consumer_group, topic, partition): offset}
consumer_offsets = self.client.get_consumer_offsets_dict()

# Fetch the broker highwater offsets
highwater_offsets = {}
try:
if len(consumer_offsets) < self._context_limit:
self.client.get_highwater_offsets(consumer_offsets)
highwater_offsets = self.client.get_highwater_offsets(consumer_offsets)
else:
self.warning("Context limit reached. Skipping highwater offset collection.")
except Exception:
self.log.exception("There was a problem collecting the highwater mark offsets.")
# Unlike consumer offsets, fail immediately because we can't calculate consumer lag w/o highwater_offsets
raise

# Fetch highwater offsets
# Expected format: {(topic, partition): offset}
highwater_offsets = self.client.get_highwater_offsets_dict()

total_contexts = len(consumer_offsets) + len(highwater_offsets)
if total_contexts >= self._context_limit:
self.warning(
Expand Down
24 changes: 12 additions & 12 deletions kafka_consumer/tests/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ def test_when_consumer_lag_less_than_zero_then_emit_event(
# highwater_offset = {(topic, partition): offset}
highwater_offset = {("topic1", "partition1"): 1}
mock_client = mock.MagicMock()
mock_client.get_consumer_offsets_dict.return_value = consumer_offset
mock_client.get_highwater_offsets_dict.return_value = highwater_offset
mock_client.get_consumer_offsets.return_value = consumer_offset
mock_client.get_highwater_offsets.return_value = highwater_offset
mock_client.get_partitions_for_topic.return_value = ['partition1']
mock_generic_client.return_value = mock_client

Expand Down Expand Up @@ -174,8 +174,8 @@ def test_when_partition_is_none_then_emit_warning_log(
# highwater_offset = {(topic, partition): offset}
highwater_offset = {("topic1", "partition1"): 1}
mock_client = mock.MagicMock()
mock_client.get_consumer_offsets_dict.return_value = consumer_offset
mock_client.get_highwater_offsets_dict.return_value = highwater_offset
mock_client.get_consumer_offsets.return_value = consumer_offset
mock_client.get_highwater_offsets.return_value = highwater_offset
mock_client.get_partitions_for_topic.return_value = None
mock_generic_client.return_value = mock_client
caplog.set_level(logging.WARNING)
Expand Down Expand Up @@ -217,8 +217,8 @@ def test_when_partition_not_in_partitions_then_emit_warning_log(
# highwater_offset = {(topic, partition): offset}
highwater_offset = {("topic1", "partition1"): 1}
mock_client = mock.MagicMock()
mock_client.get_consumer_offsets_dict.return_value = consumer_offset
mock_client.get_highwater_offsets_dict.return_value = highwater_offset
mock_client.get_consumer_offsets.return_value = consumer_offset
mock_client.get_highwater_offsets.return_value = highwater_offset
mock_client.get_partitions_for_topic.return_value = ['partition2']
mock_generic_client.return_value = mock_client
caplog.set_level(logging.WARNING)
Expand Down Expand Up @@ -260,18 +260,18 @@ def test_when_highwater_metric_count_hit_context_limit_then_no_more_highwater_me
# highwater_offset = {(topic, partition): offset}
highwater_offset = {("topic1", "partition1"): 3, ("topic2", "partition2"): 3}
mock_client = mock.MagicMock()
mock_client.get_consumer_offsets_dict.return_value = consumer_offset
mock_client.get_highwater_offsets_dict.return_value = highwater_offset
mock_client.get_consumer_offsets.return_value = consumer_offset
mock_client.get_highwater_offsets.return_value = highwater_offset
mock_client.get_partitions_for_topic.return_value = ['partition1']
mock_generic_client.return_value = mock_client
caplog.set_level(logging.WARNING)

# When
kafka_consumer_check = KafkaCheck('kafka_consumer', {'max_partition_contexts': 1}, [kafka_instance])
kafka_consumer_check = KafkaCheck('kafka_consumer', {'max_partition_contexts': 2}, [kafka_instance])
dd_run_check(kafka_consumer_check)

# Then
aggregator.assert_metric("kafka.broker_offset", count=1)
aggregator.assert_metric("kafka.broker_offset", count=2)
Comment on lines +270 to +274
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To reviewer: This test with max_partition_contexts set to 1 does not make sense to me because:

  • We mocked get_consumer_offsets_dict to return {("consumer_group1", "topic1", "partition1"): 2}, so with a len to 1
  • We mocked get_highwater_offsets_dict to return {("topic1", "partition1"): 3, ("topic2", "partition2"): 3}
  • However, if len(get_consumer_offsets_dict) > limit with limit=1, we do not even run the get_highwater_offsets methods, so get_highwater_offsets_dict should return an empty dict in that case.
  • See
    highwater_offsets = {}
    try:
    if len(consumer_offsets) < self._context_limit:
    highwater_offsets = self.client.get_highwater_offsets(consumer_offsets)
    else:
    self.warning("Context limit reached. Skipping highwater offset collection.")

To me, this test should run both methods and return more contexts than we should handle from the highwater method and then, skip some contexts. So here we now have 3 contexts (1 consumer, 2 highwaters) with a limit set to 2, we should only emit 2 metrics

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that is a good catch! I should have made an assert here to verify that get_highwater_offsets() was actually called in the first place rather than just returning a mocked get_highwater_offsets_dict().

aggregator.assert_metric("kafka.consumer_offset", count=0)
aggregator.assert_metric("kafka.consumer_lag", count=0)

Expand All @@ -290,8 +290,8 @@ def test_when_consumer_metric_count_hit_context_limit_then_no_more_consumer_metr
# highwater_offset = {(topic, partition): offset}
highwater_offset = {("topic1", "partition1"): 3, ("topic2", "partition2"): 3}
mock_client = mock.MagicMock()
mock_client.get_consumer_offsets_dict.return_value = consumer_offset
mock_client.get_highwater_offsets_dict.return_value = highwater_offset
mock_client.get_consumer_offsets.return_value = consumer_offset
mock_client.get_highwater_offsets.return_value = highwater_offset
mock_client.get_partitions_for_topic.return_value = ['partition1']
mock_generic_client.return_value = mock_client
caplog.set_level(logging.DEBUG)
Expand Down