diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py index 186dbd4b65e6e..bfeae8302da7b 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py @@ -1,14 +1,36 @@ # (C) Datadog, Inc. 2023-present # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) +from datadog_checks.kafka_consumer.client.kafka_client import KafkaClient -class ConfluentKafkaClient: - def __init__(self) -> None: - pass +class ConfluentKafkaClient(KafkaClient): + def create_kafka_admin_client(self): + raise NotImplementedError + + def get_consumer_offsets_dict(self): + raise NotImplementedError + + def get_highwater_offsets(self): + raise NotImplementedError + + def get_highwater_offsets_dict(self): + raise NotImplementedError + + def reset_offsets(self): + raise NotImplementedError + + def get_partitions_for_topic(self, topic): + raise NotImplementedError + + def request_metadata_update(self): + raise NotImplementedError + + def collect_broker_version(self): + raise NotImplementedError def get_consumer_offsets(self): - pass + raise NotImplementedError def get_broker_offset(self): - pass + raise NotImplementedError diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/generic_kafka_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/generic_kafka_client.py new file mode 100644 index 0000000000000..9c039ca37b889 --- /dev/null +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/generic_kafka_client.py @@ -0,0 +1,80 @@ +# (C) Datadog, Inc. 2023-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +from datadog_checks.kafka_consumer.client.confluent_kafka_client import ConfluentKafkaClient +from datadog_checks.kafka_consumer.client.kafka_client import KafkaClient +from datadog_checks.kafka_consumer.client.kafka_python_client import KafkaPythonClient + + +class GenericKafkaClient(KafkaClient): + def __init__(self, config, tls_context, log) -> None: + super().__init__(config, tls_context, log) + self.use_legacy_client = config.use_legacy_client + self.confluent_kafka_client = ConfluentKafkaClient(config, tls_context, log) if not self.use_legacy_client else None + self.python_kafka_client = KafkaPythonClient(config, tls_context, log) + + def get_consumer_offsets(self): + # TODO when this method is implemented in ConfluentKafkaClient, replace this with: + # if self.use_legacy_client: + # return self.python_kafka_client.get_consumer_offsets() + # return self.confluent_kafka_client.get_consumer_offsets() + + return self.python_kafka_client.get_consumer_offsets() + + def get_highwater_offsets(self): + # TODO when this method is implemented in ConfluentKafkaClient, replace this with: + # if self.use_legacy_client: + # return self.python_kafka_client.get_highwater_offsets() + # return self.confluent_kafka_client.get_highwater_offsets() + return self.python_kafka_client.get_highwater_offsets() + + def get_highwater_offsets_dict(self): + # TODO when this method is implemented in ConfluentKafkaClient, replace this with: + # if self.use_legacy_client: + # return self.python_kafka_client.get_highwater_offsets_dict() + # return self.confluent_kafka_client.get_highwater_offsets_dict() + return self.python_kafka_client.get_highwater_offsets_dict() + + def reset_offsets(self): + # TODO when this method is implemented in ConfluentKafkaClient, replace this with: + # if self.use_legacy_client: + # return self.python_kafka_client.reset_offsets() + # return self.confluent_kafka_client.reset_offsets() + return self.python_kafka_client.reset_offsets() + + def get_partitions_for_topic(self, topic): + # TODO when this method is implemented in ConfluentKafkaClient, replace this with: + # if self.use_legacy_client: + # return self.python_kafka_client.get_partitions_for_topic(topic) + # return self.confluent_kafka_client.get_partitions_for_topic(topic) + return self.python_kafka_client.get_partitions_for_topic(topic) + + def request_metadata_update(self): + # TODO when this method is implemented in ConfluentKafkaClient, replace this with: + # if self.use_legacy_client: + # return self.python_kafka_client.request_metadata_update() + # return self.confluent_kafka_client.request_metadata_update() + return self.python_kafka_client.request_metadata_update() + + def collect_broker_version(self): + # TODO when this method is implemented in ConfluentKafkaClient, replace this with: + # if self.use_legacy_client: + # return self.python_kafka_client.collect_broker_version() + # return self.confluent_kafka_client.collect_broker_version() + return self.python_kafka_client.collect_broker_version() + + def get_consumer_offsets_dict(self): + # TODO when this method is implemented in ConfluentKafkaClient, replace this with: + # if self.use_legacy_client: + # return self.python_kafka_client.get_consumer_offsets_dict() + # return self.confluent_kafka_client.get_consumer_offsets_dict() + return self.python_kafka_client.get_consumer_offsets_dict() + + def create_kafka_admin_client(self): + # TODO when this method is implemented in ConfluentKafkaClient, replace this with: + # if self.use_legacy_client: + # return self.python_kafka_client.get_consumer_offsets() + # return self.confluent_kafka_client.get_consumer_offsets() + + return self.python_kafka_client.create_kafka_admin_client() diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py index 309ed845137c7..2d8742f0f82bb 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py @@ -5,13 +5,46 @@ class KafkaClient(ABC): - def __init__(self) -> None: + def __init__(self, config, tls_context, log) -> None: + self.config = config + self.log = log + self._kafka_client = None + self._highwater_offsets = {} + self._consumer_offsets = {} + self._tls_context = tls_context + + @abstractmethod + def create_kafka_admin_client(self): pass @abstractmethod def get_consumer_offsets(self): pass + @abstractmethod + def get_consumer_offsets_dict(self): + pass + @abstractmethod def get_highwater_offsets(self): pass + + @abstractmethod + def get_highwater_offsets_dict(self): + pass + + @abstractmethod + def reset_offsets(self): + pass + + @abstractmethod + def get_partitions_for_topic(self, topic): + pass + + @abstractmethod + def request_metadata_update(self): + pass + + @abstractmethod + def collect_broker_version(self): + pass diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client_factory.py b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client_factory.py deleted file mode 100644 index 9f8010ca7b10a..0000000000000 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client_factory.py +++ /dev/null @@ -1,10 +0,0 @@ -# (C) Datadog, Inc. 2023-present -# All rights reserved -# Licensed under a 3-clause BSD style license (see LICENSE) - -from datadog_checks.kafka_consumer.client.kafka_client import KafkaClient -from datadog_checks.kafka_consumer.client.kafka_python_client import KafkaPythonClient - - -def make_client(config, tls_context, log) -> KafkaClient: - return KafkaPythonClient(config, tls_context, log) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py index b962a7390293f..6062904aa56cc 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py @@ -29,14 +29,6 @@ def token(self): class KafkaPythonClient(KafkaClient): - def __init__(self, config, tls_context, log) -> None: - self.config = config - self.log = log - self._kafka_client = None - self._highwater_offsets = {} - self._consumer_offsets = {} - self._tls_context = tls_context - def get_consumer_offsets(self): """Fetch Consumer Group offsets from Kafka. diff --git a/kafka_consumer/datadog_checks/kafka_consumer/config.py b/kafka_consumer/datadog_checks/kafka_consumer/config.py index 871159238f2ea..f6264c320b9e9 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/config.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/config.py @@ -38,6 +38,7 @@ def __init__(self, init_config, instance) -> None: self._sasl_kerberos_service_name = instance.get('sasl_kerberos_service_name', 'kafka') self._sasl_kerberos_domain_name = instance.get('sasl_kerberos_domain_name') self._sasl_oauth_token_provider = instance.get('sasl_oauth_token_provider') + self.use_legacy_client = is_affirmative(instance.get('use_legacy_client', False)) def validate_config(self): if not self._kafka_connect_str: diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index c8f911a367e91..fbcfb03cdad17 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -4,7 +4,7 @@ from time import time from datadog_checks.base import AgentCheck -from datadog_checks.kafka_consumer.client.kafka_client_factory import make_client +from datadog_checks.kafka_consumer.client.generic_kafka_client import GenericKafkaClient from datadog_checks.kafka_consumer.config import KafkaConfig @@ -25,8 +25,7 @@ def __init__(self, name, init_config, instances): super(KafkaCheck, self).__init__(name, init_config, instances) self.config = KafkaConfig(self.init_config, self.instance) self._context_limit = self.config._context_limit - tls_context = self.get_tls_context() - self.client = make_client(self.config, tls_context, self.log) + self.client = GenericKafkaClient(self.config, self.get_tls_context(), self.log) self.check_initializations.append(self.config.validate_config) def check(self, _): diff --git a/kafka_consumer/hatch.toml b/kafka_consumer/hatch.toml index 7d76d78335e7f..cb9a27625d16a 100644 --- a/kafka_consumer/hatch.toml +++ b/kafka_consumer/hatch.toml @@ -1,5 +1,20 @@ [env.collectors.datadog-checks] +[envs.default] +dependencies = [ + "datadog_checks_tests_helper @ {root:uri}/../datadog_checks_tests_helper", +] +e2e-env = false + +[envs.default.env-vars] +ZK_VERSION = "3.6.4" +LEGACY_CLIENT = "false" + +[[envs.default.matrix]] +python = ["3.8"] +version = ["1.1", "2.3", "3.3"] +impl = ["legacy"] + [[envs.default.matrix]] python = ["3.8"] version = ["1.1", "2.3", "3.3"] @@ -10,15 +25,12 @@ matrix.version.env-vars = [ { key = "KAFKA_VERSION", value = "1.1.1", if = ["1.1"] }, { key = "KAFKA_VERSION", value = "2.3.1", if = ["2.3"] }, { key = "KAFKA_VERSION", value = "3.3.2", if = ["3.3"] }, - "ZK_VERSION=3.6.4" ] - -[envs.default] -dependencies = [ - "datadog_checks_tests_helper @ {root:uri}/../datadog_checks_tests_helper", +matrix.impl.env-vars = [ + { key = "LEGACY_CLIENT", value = "true", if = ["legacy"] }, ] -e2e-env = false [envs.latest.env-vars] KAFKA_VERSION = "latest" ZK_VERSION = "3.6.4" +LEGACY_CLIENT = "false" diff --git a/kafka_consumer/tests/common.py b/kafka_consumer/tests/common.py index 9d7621498e7bd..ae09b96d224f4 100644 --- a/kafka_consumer/tests/common.py +++ b/kafka_consumer/tests/common.py @@ -4,6 +4,7 @@ import os import socket +from datadog_checks.base import is_affirmative from datadog_checks.dev import get_docker_hostname from datadog_checks.dev.utils import get_metadata_metrics @@ -17,6 +18,7 @@ KAFKA_VERSION = os.environ.get('KAFKA_VERSION') BROKER_METRICS = ['kafka.broker_offset'] CONSUMER_METRICS = ['kafka.consumer_offset', 'kafka.consumer_lag'] +LEGACY_CLIENT = is_affirmative(os.environ.get('LEGACY_CLIENT', 'false')) metrics = BROKER_METRICS + CONSUMER_METRICS diff --git a/kafka_consumer/tests/conftest.py b/kafka_consumer/tests/conftest.py index ba47b4689fc67..d11c7a366d2a9 100644 --- a/kafka_consumer/tests/conftest.py +++ b/kafka_consumer/tests/conftest.py @@ -13,7 +13,7 @@ from datadog_checks.dev import WaitFor, docker_run from datadog_checks.kafka_consumer import KafkaCheck -from .common import DOCKER_IMAGE_PATH, HOST_IP, KAFKA_CONNECT_STR, KAFKA_VERSION, TOPICS +from .common import DOCKER_IMAGE_PATH, HERE, HOST_IP, KAFKA_CONNECT_STR, KAFKA_VERSION, LEGACY_CLIENT, TOPICS from .runners import KConsumer, Producer # Dummy TLS certs @@ -21,19 +21,28 @@ cert = os.path.join(CERTIFICATE_DIR, 'cert.cert') private_key = os.path.join(CERTIFICATE_DIR, 'server.pem') -E2E_METADATA = { - 'custom_hosts': [('kafka1', '127.0.0.1'), ('kafka2', '127.0.0.1')], - 'start_commands': [ - 'apt-get update', - 'apt-get install -y build-essential', - ], -} + +if LEGACY_CLIENT: + E2E_METADATA = { + 'custom_hosts': [('kafka1', '127.0.0.1'), ('kafka2', '127.0.0.1')], + 'start_commands': [ + 'apt-get update', + 'apt-get install -y build-essential', + ], + } +else: + E2E_METADATA = { + 'custom_hosts': [('kafka1', '127.0.0.1'), ('kafka2', '127.0.0.1')], + 'docker_volumes': [f'{HERE}/scripts/start_commands.sh:/tmp/start_commands.sh'], + 'start_commands': ['bash /tmp/start_commands.sh'], + } INSTANCE = { 'kafka_connect_str': KAFKA_CONNECT_STR, 'tags': ['optional:tag1'], 'consumer_groups': {'my_consumer': {'marvel': [0]}}, 'broker_requests_batch_size': 1, + 'use_legacy_client': LEGACY_CLIENT, } @@ -81,6 +90,7 @@ def kafka_instance_tls(): 'tls_cert': cert, 'tls_private_key': private_key, 'tls_ca_cert': CERTIFICATE_DIR, + 'use_legacy_client': LEGACY_CLIENT, } diff --git a/kafka_consumer/tests/scripts/start_commands.sh b/kafka_consumer/tests/scripts/start_commands.sh new file mode 100644 index 0000000000000..7296e31e1d87f --- /dev/null +++ b/kafka_consumer/tests/scripts/start_commands.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +# TODO Remove this script once the library is installed at the agent level + +apt-get update +apt-get install -y --no-install-recommends gcc git libssl-dev g++ make build-essential libsasl2-modules-gssapi-mit krb5-user +cd /tmp && git clone https://github.com/edenhill/librdkafka.git +cd librdkafka && git checkout tags/v2.0.2 +./configure && make && make install && ldconfig +cd ../ && rm -rf librdkafka +pip install --no-binary confluent-kafka confluent-kafka