Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate config logic #13975

Closed
71 changes: 0 additions & 71 deletions kafka_consumer/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,13 @@ files:
WARNING: To avoid blindly collecting offsets and lag for an unbounded number
of partitions (as could be the case after enabling monitor_unlisted_consumer_groups
or monitor_all_broker_highwatermarks) the check collects metrics for at most 500 partitions.

DEPRECATION NOTICE: In the early days of Kafka, consumer offsets were stored in Zookeeper.
So this check currently supports fetching consumer offsets from both Kafka and Zookeeper.
However, Kafka 0.9 (released in 2015) deprecated Zookeeper storage. As a result, we have also
deprecated fetching consumer offsets from Zookeeper.
options:
- name: kafka_timeout
description: Customizes the Kafka connection timeout.
value:
type: integer
example: 5
display_default: 5
- name: zk_timeout
description: |
DEPRECATED: Customizes the ZooKeeper connection timeout.
value:
type: integer
example: 5
display_default: 5
- template: init_config/default
- template: instances
options:
Expand Down Expand Up @@ -76,9 +64,6 @@ files:
Each level is optional. Any empty values are fetched from the Kafka cluster.
You can have empty partitions (example: <CONSUMER_NAME_2>), topics (example: <CONSUMER_NAME_3>),
and even consumer_groups. If you omit consumer_groups, you must set `monitor_unlisted_consumer_groups` to true.

Deprecation notice: Omitting various levels works for zookeeper-based consumers. However, all
functionality related to fetching offsets from Zookeeper is deprecated.
value:
type: object
example:
Expand All @@ -96,8 +81,6 @@ files:
support this feature on older brokers because they do not provide a way to determine the mapping
of consumer groups to topics. For details, see KIP-88. For older Kafka brokers, the consumer groups
must be specified. This requirement only applies to the brokers, not the consumers--they can be any version.

Deprecation notice: Functionality related to consumers fetching offsets from Zookeeper is deprecated.
value:
type: boolean
example: false
Expand Down Expand Up @@ -195,57 +178,3 @@ files:
type: integer
example: 30
- template: instances/default
- name: zk_connect_str
description: |
DEPRECATION NOTICE: This option is only used for fetching consumer offsets
from Zookeeper and is deprecated.
Zookeeper endpoints and port to connect to.
In a production environment, it's often useful to specify multiple
Zookeeper nodes for a single check instance. This way you
only generate a single check process, but if one host goes down,
KafkaClient / KazooClient tries contacting the next host.
Details: https://github.com/DataDog/dd-agent/issues/2943

You may specify a single server like:

zk_connect_str: localhost:2181

or multiple servers like:

zk_connect_str:
- server1:2181
- server2:2181
value:
anyOf:
- type: string
- type: array
items:
type: string
- name: zk_prefix
description: |
DEPRECATION NOTICE: This option is only used for fetching consumer offsets
from Zookeeper and is deprecated.
Zookeeper chroot prefix under which kafka data is living in zookeeper.
If kafka is connecting to `my-zookeeper:2181/kafka` then the `zk_prefix` is `/kafka`.
value:
type: string
example: <ZK_PREFIX>
- name: kafka_consumer_offsets
description: |
DEPRECATION NOTICE: This option is only used for fetching consumer offsets
from Zookeeper and is deprecated.
This setting only applies if `zk_connect_str` is set and cannot work with
`monitor_unlisted_consumer_groups` since the generated list comes from Zookeeper.
Set to true to fetch consumer offsets from both Zookeeper and Kafka
Set to false to fetch consumer offsets only from Zookeeper.
value:
type: boolean
example: false
display_default: false
- name: data_streams_enabled
description: |
Beta feature to get a lag metric in Seconds. It's part of the Data Streams Monitoring Product
value:
type: boolean
example: false
display_default: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# (C) Datadog, Inc. 2023-present
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)
class ConfluentKafkaClient:
def __init__(self) -> None:
pass

def get_consumer_offsets(self):
pass

def get_broker_offset(self):
pass

def report_consumer_offset_and_lag(self):
pass

def report_broker_offset(self):
pass

def collect_broker_metadata(self):
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# (C) Datadog, Inc. 2023-present
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)
from abc import ABC, abstractmethod


class KafkaClient(ABC):
def __init__(self, check) -> None:
self.check = check
self.log = check.log
self._kafka_client = None
self._highwater_offsets = {}
self._consumer_offsets = {}
self._context_limit = check._context_limit

def should_get_highwater_offsets(self):
return len(self._consumer_offsets) < self._context_limit

@abstractmethod
def get_consumer_offsets(self):
pass

@abstractmethod
def get_highwater_offsets(self):
pass

@abstractmethod
def report_consumer_offsets_and_lag(self):
pass

@abstractmethod
def report_highwater_offsets(self):
pass

@abstractmethod
def collect_broker_metadata(self):
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from datadog_checks.kafka_consumer.client.kafka_client import KafkaClient
from datadog_checks.kafka_consumer.client.kafka_python_client import KafkaPythonClient


def make_client(check, config) -> KafkaClient:
return KafkaPythonClient(check, config)
Loading