Skip to content

Commit

Permalink
Move consumer group validation to config class
Browse files Browse the repository at this point in the history
  • Loading branch information
alopezz committed Apr 18, 2023
1 parent 05b1954 commit 8385026
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 23 deletions.
23 changes: 0 additions & 23 deletions kafka_consumer/datadog_checks/kafka_consumer/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ def _get_consumer_groups(self):
self.log.error("Failed to collect consumer groups: %s", e)
return consumer_groups
elif self.config._consumer_groups:
self._validate_consumer_groups()
return self.config._consumer_groups
else:
raise ConfigurationError(
Expand All @@ -192,28 +191,6 @@ def _get_consumer_offset_futures(self, consumer_groups):
[ConsumerGroupTopicPartitions(consumer_group, [topic_partition])]
)[consumer_group]

def _validate_consumer_groups(self):
"""Validate any explicitly specified consumer groups.
consumer_groups = {'consumer_group': {'topic': [0, 1]}}
"""
if not isinstance(self.config._consumer_groups, dict):
raise ConfigurationError("consumer_groups is not a dictionary")
for consumer_group, topics in self.config._consumer_groups.items():
if not isinstance(consumer_group, str):
raise ConfigurationError("consumer group is not a valid string")
if not (isinstance(topics, dict) or topics is None): # topics are optional
raise ConfigurationError("Topics is not a dictionary")
if topics is not None:
for topic, partitions in topics.items():
if not isinstance(topic, str):
raise ConfigurationError("Topic is not a valid string")
if not (isinstance(partitions, (list, tuple)) or partitions is None): # partitions are optional
raise ConfigurationError("Partitions is not a list or tuple")
if partitions is not None:
for partition in partitions:
if not isinstance(partition, int):
raise ConfigurationError("Partition is not a valid integer")

def _get_topic_partitions(self, topics, consumer_group):
for topic in topics.topics:
if topic in KAFKA_INTERNAL_TOPICS:
Expand Down
23 changes: 23 additions & 0 deletions kafka_consumer/datadog_checks/kafka_consumer/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def validate_config(self):

if self._consumer_groups and self._consumer_groups_regex:
self.log.warning("Using consumer_groups and consumer_groups_regex, will combine the two config options.")
self._validate_consumer_groups()

def _compile_regex(self, consumer_groups_regex):
patterns = {}
Expand All @@ -104,3 +105,25 @@ def _compile_regex(self, consumer_groups_regex):
patterns[consumer_group_pattern].update({topic_pattern: partitions})

return patterns

def _validate_consumer_groups(self):
"""Validate any explicitly specified consumer groups.
consumer_groups = {'consumer_group': {'topic': [0, 1]}}
"""
if not isinstance(self._consumer_groups, dict):
raise ConfigurationError("consumer_groups is not a dictionary")
for consumer_group, topics in self._consumer_groups.items():
if not isinstance(consumer_group, str):
raise ConfigurationError("consumer group is not a valid string")
if not (isinstance(topics, dict) or topics is None): # topics are optional
raise ConfigurationError("Topics is not a dictionary")
if topics is not None:
for topic, partitions in topics.items():
if not isinstance(topic, str):
raise ConfigurationError("Topic is not a valid string")
if not (isinstance(partitions, (list, tuple)) or partitions is None): # partitions are optional
raise ConfigurationError("Partitions is not a list or tuple")
if partitions is not None:
for partition in partitions:
if not isinstance(partition, int):
raise ConfigurationError("Partition is not a valid integer")

0 comments on commit 8385026

Please sign in to comment.