Skip to content

Commit

Permalink
Remove deprecated implementation of kafka_consumer (#13915)
Browse files Browse the repository at this point in the history
* Remove deprecated implementation of kafka_consumer

* Apply suggestions
  • Loading branch information
yzhan289 authored Feb 9, 2023
1 parent e644556 commit 8e6c44a
Show file tree
Hide file tree
Showing 13 changed files with 13 additions and 860 deletions.
64 changes: 0 additions & 64 deletions kafka_consumer/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,13 @@ files:
WARNING: To avoid blindly collecting offsets and lag for an unbounded number
of partitions (as could be the case after enabling monitor_unlisted_consumer_groups
or monitor_all_broker_highwatermarks) the check collects metrics for at most 500 partitions.
DEPRECATION NOTICE: In the early days of Kafka, consumer offsets were stored in Zookeeper.
So this check currently supports fetching consumer offsets from both Kafka and Zookeeper.
However, Kafka 0.9 (released in 2015) deprecated Zookeeper storage. As a result, we have also
deprecated fetching consumer offsets from Zookeeper.
options:
- name: kafka_timeout
description: Customizes the Kafka connection timeout.
value:
type: integer
example: 5
display_default: 5
- name: zk_timeout
description: |
DEPRECATED: Customizes the ZooKeeper connection timeout.
value:
type: integer
example: 5
display_default: 5
- template: init_config/default
- template: instances
options:
Expand Down Expand Up @@ -76,9 +64,6 @@ files:
Each level is optional. Any empty values are fetched from the Kafka cluster.
You can have empty partitions (example: <CONSUMER_NAME_2>), topics (example: <CONSUMER_NAME_3>),
and even consumer_groups. If you omit consumer_groups, you must set `monitor_unlisted_consumer_groups` to true.
Deprecation notice: Omitting various levels works for zookeeper-based consumers. However, all
functionality related to fetching offsets from Zookeeper is deprecated.
value:
type: object
example:
Expand All @@ -96,8 +81,6 @@ files:
support this feature on older brokers because they do not provide a way to determine the mapping
of consumer groups to topics. For details, see KIP-88. For older Kafka brokers, the consumer groups
must be specified. This requirement only applies to the brokers, not the consumers--they can be any version.
Deprecation notice: Functionality related to consumers fetching offsets from Zookeeper is deprecated.
value:
type: boolean
example: false
Expand Down Expand Up @@ -195,53 +178,6 @@ files:
type: integer
example: 30
- template: instances/default
- name: zk_connect_str
description: |
DEPRECATION NOTICE: This option is only used for fetching consumer offsets
from Zookeeper and is deprecated.
Zookeeper endpoints and port to connect to.
In a production environment, it's often useful to specify multiple
Zookeeper nodes for a single check instance. This way you
only generate a single check process, but if one host goes down,
KafkaClient / KazooClient tries contacting the next host.
Details: https://github.com/DataDog/dd-agent/issues/2943
You may specify a single server like:
zk_connect_str: localhost:2181
or multiple servers like:
zk_connect_str:
- server1:2181
- server2:2181
value:
anyOf:
- type: string
- type: array
items:
type: string
- name: zk_prefix
description: |
DEPRECATION NOTICE: This option is only used for fetching consumer offsets
from Zookeeper and is deprecated.
Zookeeper chroot prefix under which kafka data is living in zookeeper.
If kafka is connecting to `my-zookeeper:2181/kafka` then the `zk_prefix` is `/kafka`.
value:
type: string
example: <ZK_PREFIX>
- name: kafka_consumer_offsets
description: |
DEPRECATION NOTICE: This option is only used for fetching consumer offsets
from Zookeeper and is deprecated.
This setting only applies if `zk_connect_str` is set and cannot work with
`monitor_unlisted_consumer_groups` since the generated list comes from Zookeeper.
Set to true to fetch consumer offsets from both Zookeeper and Kafka
Set to false to fetch consumer offsets only from Zookeeper.
value:
type: boolean
example: false
display_default: false
- name: data_streams_enabled
description: |
Beta feature to get a lag metric in Seconds. It's part of the Data Streams Monitoring Product
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ def shared_service(field, value):
return get_default_field_value(field, value)


def shared_zk_timeout(field, value):
return 5


def instance_broker_requests_batch_size(field, value):
return 30

Expand All @@ -46,10 +42,6 @@ def instance_kafka_client_api_version(field, value):
return get_default_field_value(field, value)


def instance_kafka_consumer_offsets(field, value):
return False


def instance_metric_patterns(field, value):
return get_default_field_value(field, value)

Expand Down Expand Up @@ -128,11 +120,3 @@ def instance_tls_validate_hostname(field, value):

def instance_tls_verify(field, value):
return True


def instance_zk_connect_str(field, value):
return get_default_field_value(field, value)


def instance_zk_prefix(field, value):
return get_default_field_value(field, value)
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class Config:
empty_default_hostname: Optional[bool]
kafka_client_api_version: Optional[str]
kafka_connect_str: Union[str, Sequence[str]]
kafka_consumer_offsets: Optional[bool]
metric_patterns: Optional[MetricPatterns]
min_collection_interval: Optional[float]
monitor_all_broker_highwatermarks: Optional[bool]
Expand All @@ -68,8 +67,6 @@ class Config:
tls_private_key_password: Optional[str]
tls_validate_hostname: Optional[bool]
tls_verify: Optional[bool]
zk_connect_str: Optional[Union[str, Sequence[str]]]
zk_prefix: Optional[str]

@root_validator(pre=True)
def _initial_validation(cls, values):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ class Config:

kafka_timeout: Optional[int]
service: Optional[str]
zk_timeout: Optional[int]

@root_validator(pre=True)
def _initial_validation(cls, values):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,6 @@
## WARNING: To avoid blindly collecting offsets and lag for an unbounded number
## of partitions (as could be the case after enabling monitor_unlisted_consumer_groups
## or monitor_all_broker_highwatermarks) the check collects metrics for at most 500 partitions.
##
## DEPRECATION NOTICE: In the early days of Kafka, consumer offsets were stored in Zookeeper.
## So this check currently supports fetching consumer offsets from both Kafka and Zookeeper.
## However, Kafka 0.9 (released in 2015) deprecated Zookeeper storage. As a result, we have also
## deprecated fetching consumer offsets from Zookeeper.
#
init_config:

Expand All @@ -16,11 +11,6 @@ init_config:
#
# kafka_timeout: 5

## @param zk_timeout - integer - optional - default: 5
## DEPRECATED: Customizes the ZooKeeper connection timeout.
#
# zk_timeout: 5

## @param service - string - optional
## Attach the tag `service:<SERVICE>` to every metric, event, and service check emitted by this integration.
##
Expand Down Expand Up @@ -68,9 +58,6 @@ instances:
## Each level is optional. Any empty values are fetched from the Kafka cluster.
## You can have empty partitions (example: <CONSUMER_NAME_2>), topics (example: <CONSUMER_NAME_3>),
## and even consumer_groups. If you omit consumer_groups, you must set `monitor_unlisted_consumer_groups` to true.
##
## Deprecation notice: Omitting various levels works for zookeeper-based consumers. However, all
## functionality related to fetching offsets from Zookeeper is deprecated.
#
# consumer_groups:
# <CONSUMER_NAME_1>:
Expand All @@ -91,8 +78,6 @@ instances:
## support this feature on older brokers because they do not provide a way to determine the mapping
## of consumer groups to topics. For details, see KIP-88. For older Kafka brokers, the consumer groups
## must be specified. This requirement only applies to the brokers, not the consumers--they can be any version.
##
## Deprecation notice: Functionality related to consumers fetching offsets from Zookeeper is deprecated.
#
# monitor_unlisted_consumer_groups: false

Expand Down Expand Up @@ -256,46 +241,6 @@ instances:
# exclude:
# - <EXCLUDE_REGEX>

## @param zk_connect_str - string or list of strings - optional
## DEPRECATION NOTICE: This option is only used for fetching consumer offsets
## from Zookeeper and is deprecated.
## Zookeeper endpoints and port to connect to.
## In a production environment, it's often useful to specify multiple
## Zookeeper nodes for a single check instance. This way you
## only generate a single check process, but if one host goes down,
## KafkaClient / KazooClient tries contacting the next host.
## Details: https://github.com/DataDog/dd-agent/issues/2943
##
## You may specify a single server like:
##
## zk_connect_str: localhost:2181
##
## or multiple servers like:
##
## zk_connect_str:
## - server1:2181
## - server2:2181
#
# zk_connect_str: <ZK_CONNECT_STR>

## @param zk_prefix - string - optional
## DEPRECATION NOTICE: This option is only used for fetching consumer offsets
## from Zookeeper and is deprecated.
## Zookeeper chroot prefix under which kafka data is living in zookeeper.
## If kafka is connecting to `my-zookeeper:2181/kafka` then the `zk_prefix` is `/kafka`.
#
# zk_prefix: <ZK_PREFIX>

## @param kafka_consumer_offsets - boolean - optional - default: false
## DEPRECATION NOTICE: This option is only used for fetching consumer offsets
## from Zookeeper and is deprecated.
## This setting only applies if `zk_connect_str` is set and cannot work with
## `monitor_unlisted_consumer_groups` since the generated list comes from Zookeeper.
## Set to true to fetch consumer offsets from both Zookeeper and Kafka
## Set to false to fetch consumer offsets only from Zookeeper.
#
# kafka_consumer_offsets: false

## @param data_streams_enabled - boolean - optional - default: false
## Beta feature to get a lag metric in Seconds. It's part of the Data Streams Monitoring Product
#
Expand Down
45 changes: 1 addition & 44 deletions kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from datadog_checks.base.utils.http import AuthTokenOAuthReader

from .constants import CONTEXT_UPPER_BOUND, DEFAULT_KAFKA_TIMEOUT
from .legacy_0_10_2 import LegacyKafkaCheck_0_10_2
from .new_kafka_consumer import NewKafkaConsumerCheck


Expand Down Expand Up @@ -40,7 +39,6 @@ class KafkaCheck(AgentCheck):

def __init__(self, name, init_config, instances):
super(KafkaCheck, self).__init__(name, init_config, instances)
self.sub_check = None
self._context_limit = int(self.init_config.get('max_partition_contexts', CONTEXT_UPPER_BOUND))
self._data_streams_enabled = is_affirmative(self.instance.get('data_streams_enabled', False))
self._custom_tags = self.instance.get('tags', [])
Expand All @@ -52,7 +50,7 @@ def __init__(self, name, init_config, instances):
)
self._consumer_groups = self.instance.get('consumer_groups', {})

self.check_initializations.append(self._init_check_based_on_kafka_version)
self.sub_check = NewKafkaConsumerCheck(self)

def check(self, _):
return self.sub_check.check()
Expand Down Expand Up @@ -93,47 +91,6 @@ def validate_consumer_groups(self):
for partition in partitions:
assert isinstance(partition, int)

def _init_check_based_on_kafka_version(self):
"""Set the sub_check attribute before allowing the `check` method to run. If something fails, this method will
be retried regularly."""
self.sub_check = self._make_sub_check()

def _make_sub_check(self):
"""Determine whether to use old legacy KafkaClient implementation or the new KafkaAdminClient implementation.
The legacy version of this check uses the KafkaClient and handrolls things like looking up the GroupCoordinator,
crafting the offset requests, handling errors, etc.
The new implementation uses the KafkaAdminClient which lets us offload most of the Kafka-specific bits onto the
kafka-python library, which is used by many other tools and reduces our maintenance burden.
Unfortunately, the KafkaAdminClient requires brokers >= 0.10.0, so we split the check into legacy and new code.
Furthermore, we took the opportunity to simplify the new code by dropping support for:
1) Zookeeper-based offsets. These have been deprecated since Kafka 0.9.
2) Kafka brokers < 0.10.2. It is impossible to support monitor_unlisted_consumer_groups on these older brokers
because they do not provide a way to determine the mapping of consumer groups to topics. For details, see
KIP-88.
To clarify: This check still allows fetching offsets from zookeeper/older kafka brokers, it just uses the
legacy code path."""
if self.instance.get('zk_connect_str'):
return LegacyKafkaCheck_0_10_2(self)

kafka_version = self.instance.get('kafka_client_api_version')
if isinstance(kafka_version, str):
kafka_version = tuple(map(int, kafka_version.split(".")))
if kafka_version is None: # if unspecified by the user, we have to probe the cluster
kafka_client = self.create_kafka_client()
# version probing happens automatically as part of KafkaClient's __init__()
kafka_version = kafka_client.config['api_version']
kafka_client.close()

if kafka_version < (0, 10, 2):
return LegacyKafkaCheck_0_10_2(self)

return NewKafkaConsumerCheck(self)

def _create_kafka_client(self, clazz):
kafka_connect_str = self.instance.get('kafka_connect_str')
if not isinstance(kafka_connect_str, (string_types, list)):
Expand Down
Loading

0 comments on commit 8e6c44a

Please sign in to comment.