Skip to content

Commit

Permalink
Separate config logic
Browse files Browse the repository at this point in the history
  • Loading branch information
yzhan289 committed Feb 15, 2023
1 parent 7ee7a99 commit 4f4cae8
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ def token(self):


class KafkaPythonClient(KafkaClient):
def __init__(self, check, config) -> None:
def __init__(self, check, kafka_config) -> None:
self.check = check
self.config = config
self.kafka_config = kafka_config
self.log = check.log
self._kafka_client = None
self._highwater_offsets = {}
Expand Down Expand Up @@ -62,7 +62,7 @@ def get_consumer_offsets(self):
# don't have access to variables scoped to this method, only to the object scope
self._consumer_futures = []

if self.check._monitor_unlisted_consumer_groups:
if self.kafka_config._monitor_unlisted_consumer_groups:
for broker in self.kafka_client._client.cluster.brokers():
# FIXME: This is using a workaround to skip socket wakeup, which causes blocking
# (see https://github.com/dpkp/kafka-python/issues/2286).
Expand All @@ -71,16 +71,16 @@ def get_consumer_offsets(self):
list_groups_future = self._list_consumer_groups_send_request(broker.nodeId)
list_groups_future.add_callback(self._list_groups_callback, broker.nodeId)
self._consumer_futures.append(list_groups_future)
elif self.check._consumer_groups:
elif self.kafka_config._consumer_groups:
self._validate_consumer_groups()
for consumer_group in self.check._consumer_groups:
for consumer_group in self.kafka_config._consumer_groups:
find_coordinator_future = self._find_coordinator_id_send_request(consumer_group)
find_coordinator_future.add_callback(self._find_coordinator_callback, consumer_group)
self._consumer_futures.append(find_coordinator_future)
else:
raise ConfigurationError(
"Cannot fetch consumer offsets because no consumer_groups are specified and "
"monitor_unlisted_consumer_groups is %s." % self.check._monitor_unlisted_consumer_groups
"monitor_unlisted_consumer_groups is %s." % self.kafka_config._monitor_unlisted_consumer_groups
)

# Loop until all futures resolved.
Expand All @@ -100,7 +100,7 @@ def report_consumer_offsets_and_lag(self, contexts_limit):
)
return
consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition, 'consumer_group:%s' % consumer_group]
consumer_group_tags.extend(self.check._custom_tags)
consumer_group_tags.extend(self.kafka_config._custom_tags)

partitions = self.kafka_client._client.cluster.partitions_for_topic(topic)
self.log.debug("Received partitions %s for topic %s", partitions, topic)
Expand Down Expand Up @@ -176,10 +176,10 @@ def get_highwater_offsets(self):
# If we aren't fetching all broker highwater offsets, then construct the unique set of topic partitions for
# which this run of the check has at least once saved consumer offset. This is later used as a filter for
# excluding partitions.
if not self.check._monitor_all_broker_highwatermarks:
if not self.kafka_config._monitor_all_broker_highwatermarks:
tps_with_consumer_offset = {(topic, partition) for (_, topic, partition) in self._consumer_offsets}

for batch in self.batchify(self.kafka_client._client.cluster.brokers(), self.check._broker_requests_batch_size):
for batch in self.batchify(self.kafka_client._client.cluster.brokers(), self.kafka_config._broker_requests_batch_size):
for broker in batch:
broker_led_partitions = self.kafka_client._client.cluster.partitions_for_broker(broker.nodeId)
if broker_led_partitions is None:
Expand All @@ -191,7 +191,7 @@ def get_highwater_offsets(self):
for topic, partition in broker_led_partitions:
# No sense fetching highwater offsets for internal topics
if topic not in KAFKA_INTERNAL_TOPICS and (
self.check._monitor_all_broker_highwatermarks or (topic, partition) in tps_with_consumer_offset
self.kafka_config._monitor_all_broker_highwatermarks or (topic, partition) in tps_with_consumer_offset
):
partitions_grouped_by_topic[topic].append(partition)

Expand Down Expand Up @@ -221,7 +221,7 @@ def report_highwater_offsets(self, contexts_limit):
self.log.debug("Reporting broker offset metric")
for (topic, partition), highwater_offset in self._highwater_offsets.items():
broker_tags = ['topic:%s' % topic, 'partition:%s' % partition]
broker_tags.extend(self.check._custom_tags)
broker_tags.extend(self.kafka_config._custom_tags)
self.check.gauge('broker_offset', highwater_offset, tags=broker_tags)
reported_contexts += 1
if reported_contexts == contexts_limit:
Expand All @@ -242,10 +242,10 @@ def _create_kafka_admin_client(self, api_version):
return kafka_admin_client

def _create_kafka_client(self, clazz):
kafka_connect_str = self.check.instance.get('kafka_connect_str')
kafka_connect_str = self.kafka_config._kafka_connect_str
if not isinstance(kafka_connect_str, (string_types, list)):
raise ConfigurationError('kafka_connect_str should be string or list of strings')
kafka_version = self.check.instance.get('kafka_client_api_version')
kafka_version = self.kafka_config._kafka_version
if isinstance(kafka_version, str):
kafka_version = tuple(map(int, kafka_version.split(".")))

Expand All @@ -258,22 +258,22 @@ def _create_kafka_client(self, clazz):
return clazz(
bootstrap_servers=kafka_connect_str,
client_id='dd-agent',
request_timeout_ms=self.check.init_config.get('kafka_timeout', DEFAULT_KAFKA_TIMEOUT) * 1000,
request_timeout_ms=self.kafka_config._request_timeout_ms,
# if `kafka_client_api_version` is not set, then kafka-python automatically probes the cluster for
# broker version during the bootstrapping process. Note that this returns the first version found, so in
# a mixed-version cluster this will be a non-deterministic result.
api_version=kafka_version,
# While we check for SASL/SSL params, if not present they will default to the kafka-python values for
# plaintext connections
security_protocol=self.check.instance.get('security_protocol', 'PLAINTEXT'),
sasl_mechanism=self.check.instance.get('sasl_mechanism'),
sasl_plain_username=self.check.instance.get('sasl_plain_username'),
sasl_plain_password=self.check.instance.get('sasl_plain_password'),
sasl_kerberos_service_name=self.check.instance.get('sasl_kerberos_service_name', 'kafka'),
sasl_kerberos_domain_name=self.check.instance.get('sasl_kerberos_domain_name'),
security_protocol=self.kafka_config._security_protocol,
sasl_mechanism=self.kafka_config._sasl_mechanism,
sasl_plain_username=self.kafka_config._sasl_plain_username,
sasl_plain_password=self.kafka_config._sasl_plain_password,
sasl_kerberos_service_name=self.kafka_config._sasl_kerberos_service_name,
sasl_kerberos_domain_name=self.kafka_config._sasl_kerberos_domain_name,
sasl_oauth_token_provider=(
OAuthTokenProvider(**self.check.instance['sasl_oauth_token_provider'])
if 'sasl_oauth_token_provider' in self.check.instance
OAuthTokenProvider(**self.kafka_config._sasl_oauth_token_provider)
if 'sasl_oauth_token_provider' in self.kafka_config.instance
else None
),
ssl_context=tls_context,
Expand All @@ -285,7 +285,7 @@ def kafka_client(self):
# if `kafka_client_api_version` is not set, then kafka-python automatically probes the cluster for
# broker version during the bootstrapping process. Note that this returns the first version found, so in
# a mixed-version cluster this will be a non-deterministic result.
kafka_version = self.check.instance.get('kafka_client_api_version')
kafka_version = self.kafka_config._kafka_version
if isinstance(kafka_version, str):
kafka_version = tuple(map(int, kafka_version.split(".")))

Expand Down Expand Up @@ -357,8 +357,8 @@ def _validate_consumer_groups(self):
consumer_groups = {'consumer_group': {'topic': [0, 1]}}
"""
assert isinstance(self.check._consumer_groups, dict)
for consumer_group, topics in self.check._consumer_groups.items():
assert isinstance(self.kafka_config._consumer_groups, dict)
for consumer_group, topics in self.kafka_config._consumer_groups.items():
assert isinstance(consumer_group, string_types)
assert isinstance(topics, dict) or topics is None # topics are optional
if topics is not None:
Expand Down Expand Up @@ -397,7 +397,7 @@ def _find_coordinator_callback(self, consumer_group, response):
are unspecified for a topic listed in the config, offsets are fetched for all the partitions within that topic.
"""
coordinator_id = self.kafka_client._find_coordinator_id_process_response(response)
topics = self.check._consumer_groups[consumer_group]
topics = self.kafka_config._consumer_groups[consumer_group]
if not topics:
topic_partitions = None # None signals to fetch all known offsets for the consumer group
else:
Expand Down
35 changes: 35 additions & 0 deletions kafka_consumer/datadog_checks/kafka_consumer/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from datadog_checks.base import is_affirmative
from datadog_checks.kafka_consumer.constants import (
BROKER_REQUESTS_BATCH_SIZE,
CONTEXT_UPPER_BOUND,
DEFAULT_KAFKA_TIMEOUT,
)


class KafkaConfig:
def __init__(self, init_config, instance) -> None:
self.instance = instance
self.init_config = init_config
self._context_limit = int(init_config.get('max_partition_contexts', CONTEXT_UPPER_BOUND))
self._custom_tags = instance.get('tags', [])
self._monitor_unlisted_consumer_groups = is_affirmative(instance.get('monitor_unlisted_consumer_groups', False))
self._monitor_all_broker_highwatermarks = is_affirmative(
instance.get('monitor_all_broker_highwatermarks', False)
)
self._consumer_groups = instance.get('consumer_groups', {})
self._broker_requests_batch_size = instance.get('broker_requests_batch_size', BROKER_REQUESTS_BATCH_SIZE)

self._kafka_connect_str = instance.get('kafka_connect_str')

self._kafka_version = instance.get('kafka_client_api_version')
if isinstance(self._kafka_version, str):
self._kafka_version = tuple(map(int, self._kafka_version.split(".")))
self._crlfile = instance.get('ssl_crlfile', instance.get('tls_crlfile'))
self._request_timeout_ms = init_config.get('kafka_timeout', DEFAULT_KAFKA_TIMEOUT) * 1000
self._security_protocol = instance.get('security_protocol', 'PLAINTEXT')
self._sasl_mechanism = instance.get('sasl_mechanism')
self._sasl_plain_username = instance.get('sasl_plain_username')
self._sasl_plain_password = instance.get('sasl_plain_password')
self._sasl_kerberos_service_name = instance.get('sasl_kerberos_service_name', 'kafka')
self._sasl_kerberos_domain_name = instance.get('sasl_kerberos_domain_name')
self._sasl_oauth_token_provider = instance.get('sasl_oauth_token_provider')
17 changes: 5 additions & 12 deletions kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

from datadog_checks.base import AgentCheck, is_affirmative
from datadog_checks.kafka_consumer.client.kafka_client_factory import make_client
from datadog_checks.kafka_consumer.config import KafkaConfig

from .config_models import ConfigMixin
from .constants import BROKER_REQUESTS_BATCH_SIZE, CONTEXT_UPPER_BOUND


class KafkaCheck(AgentCheck, ConfigMixin):
Expand All @@ -24,17 +24,10 @@ class KafkaCheck(AgentCheck, ConfigMixin):

def __init__(self, name, init_config, instances):
super(KafkaCheck, self).__init__(name, init_config, instances)
self._context_limit = int(self.init_config.get('max_partition_contexts', CONTEXT_UPPER_BOUND))
self._custom_tags = self.instance.get('tags', [])
self._monitor_unlisted_consumer_groups = is_affirmative(
self.instance.get('monitor_unlisted_consumer_groups', False)
)
self._monitor_all_broker_highwatermarks = is_affirmative(
self.instance.get('monitor_all_broker_highwatermarks', False)
)
self._consumer_groups = self.instance.get('consumer_groups', {})
self._broker_requests_batch_size = self.instance.get('broker_requests_batch_size', BROKER_REQUESTS_BATCH_SIZE)
self.client = make_client(self, self.config)
self.kafka_config = KafkaConfig(self.init_config, self.instance)
self._context_limit = self.kafka_config._context_limit
self.client = make_client(self, self.kafka_config)


def check(self, _):
"""The main entrypoint of the check."""
Expand Down

0 comments on commit 4f4cae8

Please sign in to comment.