Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create environments for the new kafka client #14022

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,14 +1,36 @@
# (C) Datadog, Inc. 2023-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from datadog_checks.kafka_consumer.client.kafka_client import KafkaClient


class ConfluentKafkaClient:
def __init__(self) -> None:
pass
class ConfluentKafkaClient(KafkaClient):
def create_kafka_admin_client(self):
raise NotImplementedError

def get_consumer_offsets_dict(self):
raise NotImplementedError

def get_highwater_offsets(self):
raise NotImplementedError

def get_highwater_offsets_dict(self):
raise NotImplementedError

def reset_offsets(self):
raise NotImplementedError

def get_partitions_for_topic(self, topic):
raise NotImplementedError

def request_metadata_update(self):
raise NotImplementedError

def collect_broker_version(self):
raise NotImplementedError

def get_consumer_offsets(self):
pass
raise NotImplementedError

def get_broker_offset(self):
pass
raise NotImplementedError
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# (C) Datadog, Inc. 2023-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)

from datadog_checks.kafka_consumer.client.confluent_kafka_client import ConfluentKafkaClient
from datadog_checks.kafka_consumer.client.kafka_client import KafkaClient
from datadog_checks.kafka_consumer.client.kafka_python_client import KafkaPythonClient


class GenericKafkaClient(KafkaClient):
def __init__(self, config, tls_context, log) -> None:
super().__init__(config, tls_context, log)
self.use_legacy_client = config.use_legacy_client
self.confluent_kafka_client = ConfluentKafkaClient(config, tls_context, log) if not self.use_legacy_client else None
self.python_kafka_client = KafkaPythonClient(config, tls_context, log)

def get_consumer_offsets(self):
# TODO when this method is implemented in ConfluentKafkaClient, replace this with:
# if self.use_legacy_client:
# return self.python_kafka_client.get_consumer_offsets()
# return self.confluent_kafka_client.get_consumer_offsets()

return self.python_kafka_client.get_consumer_offsets()

def get_highwater_offsets(self):
# TODO when this method is implemented in ConfluentKafkaClient, replace this with:
# if self.use_legacy_client:
# return self.python_kafka_client.get_highwater_offsets()
# return self.confluent_kafka_client.get_highwater_offsets()
return self.python_kafka_client.get_highwater_offsets()

def get_highwater_offsets_dict(self):
# TODO when this method is implemented in ConfluentKafkaClient, replace this with:
# if self.use_legacy_client:
# return self.python_kafka_client.get_highwater_offsets_dict()
# return self.confluent_kafka_client.get_highwater_offsets_dict()
return self.python_kafka_client.get_highwater_offsets_dict()

def reset_offsets(self):
# TODO when this method is implemented in ConfluentKafkaClient, replace this with:
# if self.use_legacy_client:
# return self.python_kafka_client.reset_offsets()
# return self.confluent_kafka_client.reset_offsets()
return self.python_kafka_client.reset_offsets()

def get_partitions_for_topic(self, topic):
# TODO when this method is implemented in ConfluentKafkaClient, replace this with:
# if self.use_legacy_client:
# return self.python_kafka_client.get_partitions_for_topic(topic)
# return self.confluent_kafka_client.get_partitions_for_topic(topic)
return self.python_kafka_client.get_partitions_for_topic(topic)

def request_metadata_update(self):
# TODO when this method is implemented in ConfluentKafkaClient, replace this with:
# if self.use_legacy_client:
# return self.python_kafka_client.request_metadata_update()
# return self.confluent_kafka_client.request_metadata_update()
return self.python_kafka_client.request_metadata_update()

def collect_broker_version(self):
# TODO when this method is implemented in ConfluentKafkaClient, replace this with:
# if self.use_legacy_client:
# return self.python_kafka_client.collect_broker_version()
# return self.confluent_kafka_client.collect_broker_version()
return self.python_kafka_client.collect_broker_version()

def get_consumer_offsets_dict(self):
# TODO when this method is implemented in ConfluentKafkaClient, replace this with:
# if self.use_legacy_client:
# return self.python_kafka_client.get_consumer_offsets_dict()
# return self.confluent_kafka_client.get_consumer_offsets_dict()
return self.python_kafka_client.get_consumer_offsets_dict()

def create_kafka_admin_client(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI I think we may need to remove create_kafka_admin_client() in the kafka_python_client.py code eventually since that was vestigial from the legacy implementation but couldn't outright remove due to it being used in the tests. I'm ok with keeping this for now, but I don't think we need to implement a similar function in confluent_kafka_client.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree. I did not do it in this PR to focus on the global test structure. We can remove it on follow-up PRs :)

# TODO when this method is implemented in ConfluentKafkaClient, replace this with:
# if self.use_legacy_client:
# return self.python_kafka_client.get_consumer_offsets()
# return self.confluent_kafka_client.get_consumer_offsets()

return self.python_kafka_client.create_kafka_admin_client()
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,46 @@


class KafkaClient(ABC):
def __init__(self) -> None:
def __init__(self, config, tls_context, log) -> None:
self.config = config
self.log = log
self._kafka_client = None
self._highwater_offsets = {}
self._consumer_offsets = {}
self._tls_context = tls_context

@abstractmethod
def create_kafka_admin_client(self):
pass

@abstractmethod
def get_consumer_offsets(self):
pass

@abstractmethod
def get_consumer_offsets_dict(self):
pass

@abstractmethod
def get_highwater_offsets(self):
pass

@abstractmethod
def get_highwater_offsets_dict(self):
pass

@abstractmethod
def reset_offsets(self):
pass

@abstractmethod
def get_partitions_for_topic(self, topic):
pass

@abstractmethod
def request_metadata_update(self):
pass

@abstractmethod
def collect_broker_version(self):
pass

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,6 @@ def token(self):


class KafkaPythonClient(KafkaClient):
def __init__(self, config, tls_context, log) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come we wouldn't need this part anymore?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah gotcha, missed that!

self.config = config
self.log = log
self._kafka_client = None
self._highwater_offsets = {}
self._consumer_offsets = {}
self._tls_context = tls_context

def get_consumer_offsets(self):
"""Fetch Consumer Group offsets from Kafka.

Expand Down
1 change: 1 addition & 0 deletions kafka_consumer/datadog_checks/kafka_consumer/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(self, init_config, instance) -> None:
self._sasl_kerberos_service_name = instance.get('sasl_kerberos_service_name', 'kafka')
self._sasl_kerberos_domain_name = instance.get('sasl_kerberos_domain_name')
self._sasl_oauth_token_provider = instance.get('sasl_oauth_token_provider')
self.use_legacy_client = is_affirmative(instance.get('use_legacy_client', False))

def validate_config(self):
if not self._kafka_connect_str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from time import time

from datadog_checks.base import AgentCheck
from datadog_checks.kafka_consumer.client.kafka_client_factory import make_client
from datadog_checks.kafka_consumer.client.generic_kafka_client import GenericKafkaClient
from datadog_checks.kafka_consumer.config import KafkaConfig


Expand All @@ -25,8 +25,7 @@ def __init__(self, name, init_config, instances):
super(KafkaCheck, self).__init__(name, init_config, instances)
self.config = KafkaConfig(self.init_config, self.instance)
self._context_limit = self.config._context_limit
tls_context = self.get_tls_context()
self.client = make_client(self.config, tls_context, self.log)
self.client = GenericKafkaClient(self.config, self.get_tls_context(), self.log)
self.check_initializations.append(self.config.validate_config)

def check(self, _):
Expand Down
24 changes: 18 additions & 6 deletions kafka_consumer/hatch.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
[env.collectors.datadog-checks]

[envs.default]
dependencies = [
"datadog_checks_tests_helper @ {root:uri}/../datadog_checks_tests_helper",
]
e2e-env = false

[envs.default.env-vars]
ZK_VERSION = "3.6.4"
LEGACY_CLIENT = "false"

[[envs.default.matrix]]
python = ["3.8"]
version = ["1.1", "2.3", "3.3"]
impl = ["legacy"]

[[envs.default.matrix]]
python = ["3.8"]
version = ["1.1", "2.3", "3.3"]
Expand All @@ -10,15 +25,12 @@ matrix.version.env-vars = [
{ key = "KAFKA_VERSION", value = "1.1.1", if = ["1.1"] },
{ key = "KAFKA_VERSION", value = "2.3.1", if = ["2.3"] },
{ key = "KAFKA_VERSION", value = "3.3.2", if = ["3.3"] },
"ZK_VERSION=3.6.4"
]

[envs.default]
dependencies = [
"datadog_checks_tests_helper @ {root:uri}/../datadog_checks_tests_helper",
matrix.impl.env-vars = [
{ key = "LEGACY_CLIENT", value = "true", if = ["legacy"] },
]
e2e-env = false

[envs.latest.env-vars]
KAFKA_VERSION = "latest"
ZK_VERSION = "3.6.4"
LEGACY_CLIENT = "false"
2 changes: 2 additions & 0 deletions kafka_consumer/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import socket

from datadog_checks.base import is_affirmative
from datadog_checks.dev import get_docker_hostname
from datadog_checks.dev.utils import get_metadata_metrics

Expand All @@ -17,6 +18,7 @@
KAFKA_VERSION = os.environ.get('KAFKA_VERSION')
BROKER_METRICS = ['kafka.broker_offset']
CONSUMER_METRICS = ['kafka.consumer_offset', 'kafka.consumer_lag']
LEGACY_CLIENT = is_affirmative(os.environ.get('LEGACY_CLIENT', 'false'))

metrics = BROKER_METRICS + CONSUMER_METRICS

Expand Down
26 changes: 18 additions & 8 deletions kafka_consumer/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,36 @@
from datadog_checks.dev import WaitFor, docker_run
from datadog_checks.kafka_consumer import KafkaCheck

from .common import DOCKER_IMAGE_PATH, HOST_IP, KAFKA_CONNECT_STR, KAFKA_VERSION, TOPICS
from .common import DOCKER_IMAGE_PATH, HERE, HOST_IP, KAFKA_CONNECT_STR, KAFKA_VERSION, LEGACY_CLIENT, TOPICS
from .runners import KConsumer, Producer

# Dummy TLS certs
CERTIFICATE_DIR = os.path.join(os.path.dirname(__file__), 'certificate')
cert = os.path.join(CERTIFICATE_DIR, 'cert.cert')
private_key = os.path.join(CERTIFICATE_DIR, 'server.pem')

E2E_METADATA = {
'custom_hosts': [('kafka1', '127.0.0.1'), ('kafka2', '127.0.0.1')],
'start_commands': [
'apt-get update',
'apt-get install -y build-essential',
],
}

if LEGACY_CLIENT:
E2E_METADATA = {
'custom_hosts': [('kafka1', '127.0.0.1'), ('kafka2', '127.0.0.1')],
'start_commands': [
'apt-get update',
'apt-get install -y build-essential',
],
}
else:
E2E_METADATA = {
'custom_hosts': [('kafka1', '127.0.0.1'), ('kafka2', '127.0.0.1')],
'docker_volumes': [f'{HERE}/scripts/start_commands.sh:/tmp/start_commands.sh'],
'start_commands': ['bash /tmp/start_commands.sh'],
}

INSTANCE = {
'kafka_connect_str': KAFKA_CONNECT_STR,
'tags': ['optional:tag1'],
'consumer_groups': {'my_consumer': {'marvel': [0]}},
'broker_requests_batch_size': 1,
'use_legacy_client': LEGACY_CLIENT,
}


Expand Down Expand Up @@ -81,6 +90,7 @@ def kafka_instance_tls():
'tls_cert': cert,
'tls_private_key': private_key,
'tls_ca_cert': CERTIFICATE_DIR,
'use_legacy_client': LEGACY_CLIENT,
}


Expand Down
11 changes: 11 additions & 0 deletions kafka_consumer/tests/scripts/start_commands.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash

# TODO Remove this script once the library is installed at the agent level

apt-get update
apt-get install -y --no-install-recommends gcc git libssl-dev g++ make build-essential libsasl2-modules-gssapi-mit krb5-user
cd /tmp && git clone https://github.com/edenhill/librdkafka.git
cd librdkafka && git checkout tags/v2.0.2
./configure && make && make install && ldconfig
cd ../ && rm -rf librdkafka
pip install --no-binary confluent-kafka confluent-kafka