Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correctly handle errors during initialization + code refactor #9626

Merged
merged 4 commits into from
Jul 23, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
553 changes: 79 additions & 474 deletions kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py

Large diffs are not rendered by default.

104 changes: 19 additions & 85 deletions kafka_consumer/datadog_checks/kafka_consumer/legacy_0_10_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,45 +4,32 @@
from __future__ import division
FlorianVeaux marked this conversation as resolved.
Show resolved Hide resolved

from collections import defaultdict
from time import time

from kafka import KafkaClient
from kafka import errors as kafka_errors
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy, OffsetResponse
from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError
from six import string_types

from datadog_checks.base import AgentCheck, ConfigurationError, is_affirmative
from datadog_checks.base import ConfigurationError, is_affirmative

from .constants import CONTEXT_UPPER_BOUND, DEFAULT_KAFKA_TIMEOUT, KAFKA_INTERNAL_TOPICS
from .constants import KAFKA_INTERNAL_TOPICS


class LegacyKafkaCheck_0_10_2(AgentCheck):
class LegacyKafkaCheck_0_10_2(object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced this is a better approach for this case; can you explain your rationale for composition over inheritance?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Option 1 is to make LegacyKafkaCheck and NewKafkaCheck both inherit from KafkaConsumerCheck. This can't be done as the agent refuses to load KafkaConsumerCheck if it's subclassed anywhere.
  • Option 2 is to keep a class KafkaCheck and override the __new__ method to return a LegacyKafkaCheck instance in case a certain condition is found after connecting to Kafka. This is the current situation and it needs to be removed with the refactor. Indeed we can't retry checking that version condition by setting the check as a check_initialization, once the class is initialized we can't change it.
  • Option 3 is to make a KafkaConsumer main class that inherit from AgentCheck and this is the only class that can inherit from it. We then need a NewKafkaConsumer implementation and a LegacyKafkaCheck implementation. The common code can now live in KafkaConsumer.

With option 3 there are a few design possibilities, I decided to override the __getattr__ method because of the large number of calls to self.gauge, self.submit_metadata etc. that are happening in many places.

"""
Check the offsets and lag of Kafka consumers. This check also returns broker highwater offsets.

This is the legacy codepath which is used when either broker version < 0.10.2 or zk_connect_str has a value.
"""

__NAMESPACE__ = 'kafka'
def __init__(self, parent_check):
self._parent_check = parent_check

def __init__(self, name, init_config, instances):
super(LegacyKafkaCheck_0_10_2, self).__init__(name, init_config, instances)
self._context_limit = int(init_config.get('max_partition_contexts', CONTEXT_UPPER_BOUND))
self._custom_tags = self.instance.get('tags', [])

self._monitor_unlisted_consumer_groups = is_affirmative(
self.instance.get('monitor_unlisted_consumer_groups', False)
)
self._monitor_all_broker_highwatermarks = is_affirmative(
self.instance.get('monitor_all_broker_highwatermarks', False)
)
self._consumer_groups = self.instance.get('consumer_groups', {})
# Note: We cannot skip validation if monitor_unlisted_consumer_groups is True because this legacy check only
# supports that functionality for Zookeeper, not Kafka.
self._validate_explicit_consumer_groups()
self.validate_consumer_groups()

self._kafka_client = self._create_kafka_client()
self._zk_hosts_ports = self.instance.get('zk_connect_str')
Expand All @@ -60,10 +47,18 @@ def __init__(self, name, init_config, instances):
else:
raise ConfigurationError("zk_connect_str must be a string or list of strings")

self._zk_client = KazooClient(hosts=self._zk_hosts_ports, timeout=int(init_config.get('zk_timeout', 5)))
self._zk_client = KazooClient(
hosts=self._zk_hosts_ports, timeout=int(self.init_config.get('zk_timeout', 5))
)
self._zk_client.start()

def check(self, instance):
def __getattr__(self, item):
try:
return getattr(self._parent_check, item)
except AttributeError:
raise AttributeError("LegacyKafkaCheck_0_10_2 has no attribute called {}".format(item))

def check(self):
"""The main entrypoint of the check."""
self.log.debug("Running legacy Kafka Consumer check.")
self._zk_consumer_offsets = {} # Expected format: {(consumer_group, topic, partition): offset}
Expand All @@ -90,7 +85,7 @@ def check(self, instance):
# Support for storing offsets in Kafka not available until Kafka 0.8.2. Also, for legacy reasons, this check
# only fetches consumer offsets from Kafka if Zookeeper is omitted or kafka_consumer_offsets is True.
if self._kafka_client.config.get('api_version') >= (0, 8, 2) and is_affirmative(
instance.get('kafka_consumer_offsets', self._zk_hosts_ports is None)
self.instance.get('kafka_consumer_offsets', self._zk_hosts_ports is None)
):
try:
self.log.debug('Collecting consumer offsets')
Expand All @@ -104,7 +99,7 @@ def check(self, instance):
'Identified api_version: %s, kafka_consumer_offsets: %s, zk_connection_string: %s.'
' Skipping consumer offset collection',
str(self._kafka_client.config.get('api_version')),
str(instance.get('kafka_consumer_offsets')),
str(self.instance.get('kafka_consumer_offsets')),
str(self._zk_hosts_ports),
)

Expand Down Expand Up @@ -135,35 +130,7 @@ def check(self, instance):
self._report_consumer_offsets_and_lag(self._zk_consumer_offsets, source='zk')

def _create_kafka_client(self):
kafka_conn_str = self.instance.get('kafka_connect_str')
if not isinstance(kafka_conn_str, (string_types, list)):
raise ConfigurationError('kafka_connect_str should be string or list of strings')
kafka_version = self.instance.get('kafka_client_api_version')
if isinstance(kafka_version, str):
kafka_version = tuple(map(int, kafka_version.split(".")))
kafka_client = KafkaClient(
bootstrap_servers=kafka_conn_str,
client_id='dd-agent',
request_timeout_ms=self.init_config.get('kafka_timeout', DEFAULT_KAFKA_TIMEOUT) * 1000,
# if `kafka_client_api_version` is not set, then kafka-python automatically probes the cluster for broker
# version during the bootstrapping process. Note that probing randomly picks a broker to probe, so in a
# mixed-version cluster probing returns a non-deterministic result.
api_version=kafka_version,
# While we check for SSL params, if not present they will default to the kafka-python values for plaintext
# connections
security_protocol=self.instance.get('security_protocol', 'PLAINTEXT'),
sasl_mechanism=self.instance.get('sasl_mechanism'),
sasl_plain_username=self.instance.get('sasl_plain_username'),
sasl_plain_password=self.instance.get('sasl_plain_password'),
sasl_kerberos_service_name=self.instance.get('sasl_kerberos_service_name', 'kafka'),
sasl_kerberos_domain_name=self.instance.get('sasl_kerberos_domain_name'),
ssl_cafile=self.instance.get('ssl_cafile'),
ssl_check_hostname=self.instance.get('ssl_check_hostname', True),
ssl_certfile=self.instance.get('ssl_certfile'),
ssl_keyfile=self.instance.get('ssl_keyfile'),
ssl_crlfile=self.instance.get('ssl_crlfile'),
ssl_password=self.instance.get('ssl_password'),
)
kafka_client = self.create_kafka_client()
# Force initial population of the local cluster metadata cache
kafka_client.poll(future=kafka_client.cluster.request_update())
if kafka_client.cluster.topics(exclude_internal_topics=False) is None:
Expand Down Expand Up @@ -453,36 +420,3 @@ def _get_group_coordinator(self, group):
error_type = kafka_errors.for_code(response.error_code)
if error_type is kafka_errors.NoError:
return response.coordinator_id

def _validate_explicit_consumer_groups(self):
"""Validate any explicitly specified consumer groups.

While the check does not require specifying consumer groups,
if they are specified this method should be used to validate them.

consumer_groups = {'consumer_group': {'topic': [0, 1]}}
"""
assert isinstance(self._consumer_groups, dict)
for consumer_group, topics in self._consumer_groups.items():
assert isinstance(consumer_group, string_types)
assert isinstance(topics, dict) or topics is None # topics are optional
if topics is not None:
for topic, partitions in topics.items():
assert isinstance(topic, string_types)
assert isinstance(partitions, (list, tuple)) or partitions is None # partitions are optional
if partitions is not None:
for partition in partitions:
assert isinstance(partition, int)

def _send_event(self, title, text, tags, event_type, aggregation_key, severity='info'):
"""Emit an event to the Datadog Event Stream."""
event_dict = {
'timestamp': int(time()),
'msg_title': title,
'event_type': event_type,
'alert_type': severity,
'msg_text': text,
'tags': tags,
'aggregation_key': aggregation_key,
}
self.event(event_dict)
Loading