Skip to content

Commit

Permalink
Create environments for the new kafka client
Browse files Browse the repository at this point in the history
  • Loading branch information
FlorentClarret committed Feb 24, 2023
1 parent e35d80b commit 8174e9e
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,36 @@
# (C) Datadog, Inc. 2023-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from datadog_checks.kafka_consumer.client.kafka_client import KafkaClient


class ConfluentKafkaClient:
def __init__(self) -> None:
pass
class ConfluentKafkaClient(KafkaClient):
def create_kafka_admin_client(self):
raise NotImplementedError

def get_consumer_offsets_dict(self):
raise NotImplementedError

def get_highwater_offsets(self):
raise NotImplementedError

def get_highwater_offsets_dict(self):
raise NotImplementedError

def reset_offsets(self):
raise NotImplementedError

def get_partitions_for_topic(self, topic):
raise NotImplementedError

def request_metadata_update(self):
raise NotImplementedError

def collect_broker_version(self):
raise NotImplementedError

def get_consumer_offsets(self):
pass
raise NotImplementedError

def get_broker_offset(self):
pass
raise NotImplementedError
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# (C) Datadog, Inc. 2023-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)

from datadog_checks.kafka_consumer.client.confluent_kafka_client import ConfluentKafkaClient
from datadog_checks.kafka_consumer.client.kafka_client import KafkaClient
from datadog_checks.kafka_consumer.client.kafka_python_client import KafkaPythonClient


class GenericKafkaClient(KafkaClient):
def __init__(self, config, tls_context, log) -> None:
super().__init__(config, tls_context, log)
self.use_legacy_client = config.use_legacy_client
self.confluent_kafka_client = ConfluentKafkaClient(config, tls_context, log) if not self.use_legacy_client else None
self.python_kafka_client = KafkaPythonClient(config, tls_context, log)

def get_consumer_offsets(self):
# TODO when this method is implemented in ConfluentKafkaClient, replace this with:
# if self.use_legacy_client:
# return self.python_kafka_client.get_consumer_offsets()
# return self.confluent_kafka_client.get_consumer_offsets()

return self.python_kafka_client.get_consumer_offsets()

def get_highwater_offsets(self):
# TODO when this method is implemented in ConfluentKafkaClient, replace this with:
# if self.use_legacy_client:
# return self.python_kafka_client.get_highwater_offsets()
# return self.confluent_kafka_client.get_highwater_offsets()
return self.python_kafka_client.get_highwater_offsets()

def get_highwater_offsets_dict(self):
# TODO when this method is implemented in ConfluentKafkaClient, replace this with:
# if self.use_legacy_client:
# return self.python_kafka_client.get_highwater_offsets_dict()
# return self.confluent_kafka_client.get_highwater_offsets_dict()
return self.python_kafka_client.get_highwater_offsets_dict()

def reset_offsets(self):
# TODO when this method is implemented in ConfluentKafkaClient, replace this with:
# if self.use_legacy_client:
# return self.python_kafka_client.reset_offsets()
# return self.confluent_kafka_client.reset_offsets()
return self.python_kafka_client.reset_offsets()

def get_partitions_for_topic(self, topic):
# TODO when this method is implemented in ConfluentKafkaClient, replace this with:
# if self.use_legacy_client:
# return self.python_kafka_client.get_partitions_for_topic(topic)
# return self.confluent_kafka_client.get_partitions_for_topic(topic)
return self.python_kafka_client.get_partitions_for_topic(topic)

def request_metadata_update(self):
# TODO when this method is implemented in ConfluentKafkaClient, replace this with:
# if self.use_legacy_client:
# return self.python_kafka_client.request_metadata_update()
# return self.confluent_kafka_client.request_metadata_update()
return self.python_kafka_client.request_metadata_update()

def collect_broker_version(self):
# TODO when this method is implemented in ConfluentKafkaClient, replace this with:
# if self.use_legacy_client:
# return self.python_kafka_client.collect_broker_version()
# return self.confluent_kafka_client.collect_broker_version()
return self.python_kafka_client.collect_broker_version()

def get_consumer_offsets_dict(self):
# TODO when this method is implemented in ConfluentKafkaClient, replace this with:
# if self.use_legacy_client:
# return self.python_kafka_client.get_consumer_offsets_dict()
# return self.confluent_kafka_client.get_consumer_offsets_dict()
return self.python_kafka_client.get_consumer_offsets_dict()

def create_kafka_admin_client(self):
# TODO when this method is implemented in ConfluentKafkaClient, replace this with:
# if self.use_legacy_client:
# return self.python_kafka_client.get_consumer_offsets()
# return self.confluent_kafka_client.get_consumer_offsets()

return self.python_kafka_client.create_kafka_admin_client()
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,46 @@


class KafkaClient(ABC):
def __init__(self) -> None:
def __init__(self, config, tls_context, log) -> None:
self.config = config
self.log = log
self._kafka_client = None
self._highwater_offsets = {}
self._consumer_offsets = {}
self._tls_context = tls_context

@abstractmethod
def create_kafka_admin_client(self):
pass

@abstractmethod
def get_consumer_offsets(self):
pass

@abstractmethod
def get_consumer_offsets_dict(self):
pass

@abstractmethod
def get_highwater_offsets(self):
pass

@abstractmethod
def get_highwater_offsets_dict(self):
pass

@abstractmethod
def reset_offsets(self):
pass

@abstractmethod
def get_partitions_for_topic(self, topic):
pass

@abstractmethod
def request_metadata_update(self):
pass

@abstractmethod
def collect_broker_version(self):
pass

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,6 @@ def token(self):


class KafkaPythonClient(KafkaClient):
def __init__(self, config, tls_context, log) -> None:
self.config = config
self.log = log
self._kafka_client = None
self._highwater_offsets = {}
self._consumer_offsets = {}
self._tls_context = tls_context

def get_consumer_offsets(self):
"""Fetch Consumer Group offsets from Kafka.
Expand Down
1 change: 1 addition & 0 deletions kafka_consumer/datadog_checks/kafka_consumer/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(self, init_config, instance) -> None:
self._sasl_kerberos_service_name = instance.get('sasl_kerberos_service_name', 'kafka')
self._sasl_kerberos_domain_name = instance.get('sasl_kerberos_domain_name')
self._sasl_oauth_token_provider = instance.get('sasl_oauth_token_provider')
self.use_legacy_client = is_affirmative(instance.get('use_legacy_client', False))

def validate_config(self):
if not self._kafka_connect_str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from time import time

from datadog_checks.base import AgentCheck
from datadog_checks.kafka_consumer.client.kafka_client_factory import make_client
from datadog_checks.kafka_consumer.client.generic_kafka_client import GenericKafkaClient
from datadog_checks.kafka_consumer.config import KafkaConfig


Expand All @@ -25,8 +25,7 @@ def __init__(self, name, init_config, instances):
super(KafkaCheck, self).__init__(name, init_config, instances)
self.config = KafkaConfig(self.init_config, self.instance)
self._context_limit = self.config._context_limit
tls_context = self.get_tls_context()
self.client = make_client(self.config, tls_context, self.log)
self.client = GenericKafkaClient(self.config, self.get_tls_context(), self.log)
self.check_initializations.append(self.config.validate_config)

def check(self, _):
Expand Down
24 changes: 18 additions & 6 deletions kafka_consumer/hatch.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
[env.collectors.datadog-checks]

[envs.default]
dependencies = [
"datadog_checks_tests_helper @ {root:uri}/../datadog_checks_tests_helper",
]
e2e-env = false

[envs.default.env-vars]
ZK_VERSION = "3.6.4"
LEGACY_CLIENT = "false"

[[envs.default.matrix]]
python = ["3.8"]
version = ["1.1", "2.3", "3.3"]
impl = ["legacy"]

[[envs.default.matrix]]
python = ["3.8"]
version = ["1.1", "2.3", "3.3"]
Expand All @@ -10,15 +25,12 @@ matrix.version.env-vars = [
{ key = "KAFKA_VERSION", value = "1.1.1", if = ["1.1"] },
{ key = "KAFKA_VERSION", value = "2.3.1", if = ["2.3"] },
{ key = "KAFKA_VERSION", value = "3.3.2", if = ["3.3"] },
"ZK_VERSION=3.6.4"
]

[envs.default]
dependencies = [
"datadog_checks_tests_helper @ {root:uri}/../datadog_checks_tests_helper",
matrix.impl.env-vars = [
{ key = "LEGACY_CLIENT", value = "true", if = ["legacy"] },
]
e2e-env = false

[envs.latest.env-vars]
KAFKA_VERSION = "latest"
ZK_VERSION = "3.6.4"
LEGACY_CLIENT = "false"
2 changes: 2 additions & 0 deletions kafka_consumer/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import socket

from datadog_checks.base import is_affirmative
from datadog_checks.dev import get_docker_hostname
from datadog_checks.dev.utils import get_metadata_metrics

Expand All @@ -17,6 +18,7 @@
KAFKA_VERSION = os.environ.get('KAFKA_VERSION')
BROKER_METRICS = ['kafka.broker_offset']
CONSUMER_METRICS = ['kafka.consumer_offset', 'kafka.consumer_lag']
LEGACY_CLIENT = is_affirmative(os.environ.get('LEGACY_CLIENT', 'false'))

metrics = BROKER_METRICS + CONSUMER_METRICS

Expand Down
26 changes: 18 additions & 8 deletions kafka_consumer/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,36 @@
from datadog_checks.dev import WaitFor, docker_run
from datadog_checks.kafka_consumer import KafkaCheck

from .common import DOCKER_IMAGE_PATH, HOST_IP, KAFKA_CONNECT_STR, KAFKA_VERSION, TOPICS
from .common import DOCKER_IMAGE_PATH, HERE, HOST_IP, KAFKA_CONNECT_STR, KAFKA_VERSION, LEGACY_CLIENT, TOPICS
from .runners import KConsumer, Producer

# Dummy TLS certs
CERTIFICATE_DIR = os.path.join(os.path.dirname(__file__), 'certificate')
cert = os.path.join(CERTIFICATE_DIR, 'cert.cert')
private_key = os.path.join(CERTIFICATE_DIR, 'server.pem')

E2E_METADATA = {
'custom_hosts': [('kafka1', '127.0.0.1'), ('kafka2', '127.0.0.1')],
'start_commands': [
'apt-get update',
'apt-get install -y build-essential',
],
}

if LEGACY_CLIENT:
E2E_METADATA = {
'custom_hosts': [('kafka1', '127.0.0.1'), ('kafka2', '127.0.0.1')],
'start_commands': [
'apt-get update',
'apt-get install -y build-essential',
],
}
else:
E2E_METADATA = {
'custom_hosts': [('kafka1', '127.0.0.1'), ('kafka2', '127.0.0.1')],
'docker_volumes': [f'{HERE}/scripts/start_commands.sh:/tmp/start_commands.sh'],
'start_commands': ['bash /tmp/start_commands.sh'],
}

INSTANCE = {
'kafka_connect_str': KAFKA_CONNECT_STR,
'tags': ['optional:tag1'],
'consumer_groups': {'my_consumer': {'marvel': [0]}},
'broker_requests_batch_size': 1,
'use_legacy_client': LEGACY_CLIENT,
}


Expand Down Expand Up @@ -81,6 +90,7 @@ def kafka_instance_tls():
'tls_cert': cert,
'tls_private_key': private_key,
'tls_ca_cert': CERTIFICATE_DIR,
'use_legacy_client': LEGACY_CLIENT,
}


Expand Down
11 changes: 11 additions & 0 deletions kafka_consumer/tests/scripts/start_commands.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash

# TODO Remove this script once the library is installed at the agent level

apt-get update
apt-get install -y --no-install-recommends gcc git libssl-dev g++ make build-essential libsasl2-modules-gssapi-mit krb5-user
cd /tmp && git clone https://github.com/edenhill/librdkafka.git
cd librdkafka && git checkout tags/v2.0.2
./configure && make && make install && ldconfig
cd ../ && rm -rf librdkafka
pip install --no-binary confluent-kafka confluent-kafka

0 comments on commit 8174e9e

Please sign in to comment.