Skip to content

Commit

Permalink
revert client param
Browse files Browse the repository at this point in the history
  • Loading branch information
fanny-jiang committed Feb 22, 2023
1 parent 22387e7 commit e35d80b
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@


class ConfluentKafkaClient:
def __init__(self, config, tls_context, log) -> None:
self.config = config
self.log = log
self._tls_context = tls_context
def __init__(self) -> None:
pass

def get_consumer_offsets(self):
pass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
# Licensed under a 3-clause BSD style license (see LICENSE)

from datadog_checks.kafka_consumer.client.kafka_client import KafkaClient
from datadog_checks.kafka_consumer.client.kafka_python_client import KafkaPythonClient


def make_client(client, config, tls_context, log) -> KafkaClient:
return client(config, tls_context, log)
def make_client(config, tls_context, log) -> KafkaClient:
return KafkaPythonClient(config, tls_context, log)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from datadog_checks.base import AgentCheck
from datadog_checks.kafka_consumer.client.kafka_client_factory import make_client
from datadog_checks.kafka_consumer.client.kafka_python_client import KafkaPythonClient
from datadog_checks.kafka_consumer.config import KafkaConfig


Expand All @@ -22,12 +21,12 @@ class KafkaCheck(AgentCheck):
'ssl_password': {'name': 'tls_private_key_password'},
}

def __init__(self, name, init_config, instances, client=KafkaPythonClient):
def __init__(self, name, init_config, instances):
super(KafkaCheck, self).__init__(name, init_config, instances)
self.config = KafkaConfig(self.init_config, self.instance)
self._context_limit = self.config._context_limit
tls_context = self.get_tls_context()
self.client = make_client(client, self.config, tls_context, self.log)
self.client = make_client(self.config, tls_context, self.log)
self.check_initializations.append(self.config.validate_config)

def check(self, _):
Expand Down
5 changes: 1 addition & 4 deletions kafka_consumer/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

from datadog_checks.dev import WaitFor, docker_run
from datadog_checks.kafka_consumer import KafkaCheck
from datadog_checks.kafka_consumer.client.confluent_kafka_client import ConfluentKafkaClient

from .common import DOCKER_IMAGE_PATH, HOST_IP, KAFKA_CONNECT_STR, KAFKA_VERSION, TOPICS
from .runners import KConsumer, Producer
Expand Down Expand Up @@ -62,9 +61,7 @@ def dd_environment(mock_local_kafka_hosts_dns):

@pytest.fixture
def check():
return lambda instance, init_config=None, client=None: KafkaCheck(
'kafka_consumer', init_config or {}, [instance], client or ConfluentKafkaClient
)
return lambda instance, init_config=None: KafkaCheck('kafka_consumer', init_config or {}, [instance])


@pytest.fixture
Expand Down
17 changes: 8 additions & 9 deletions kafka_consumer/tests/python_client/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import pytest

from datadog_checks.dev.utils import get_metadata_metrics
from datadog_checks.kafka_consumer.client.kafka_python_client import KafkaPythonClient

from ..common import KAFKA_CONNECT_STR, assert_check_kafka

Expand All @@ -15,15 +14,15 @@ def test_check_kafka(aggregator, check, kafka_instance, dd_run_check):
"""
Testing Kafka_consumer check.
"""
dd_run_check(check(kafka_instance, client=KafkaPythonClient))
dd_run_check(check(kafka_instance))
assert_check_kafka(aggregator, kafka_instance['consumer_groups'])


def test_can_send_event(aggregator, check, kafka_instance, dd_run_check):
"""
Testing Kafka_consumer check.
"""
kafka_consumer_check = check(kafka_instance, client=KafkaPythonClient)
kafka_consumer_check = check(kafka_instance)
kafka_consumer_check.send_event("test", "test", [], "test", "test")
aggregator.assert_event("test", exact_match=False, count=1)

Expand All @@ -32,35 +31,35 @@ def test_check_kafka_metrics_limit(aggregator, check, kafka_instance, dd_run_che
"""
Testing Kafka_consumer check.
"""
dd_run_check(check(kafka_instance, {'max_partition_contexts': 1}, client=KafkaPythonClient))
dd_run_check(check(kafka_instance, {'max_partition_contexts': 1}))

assert len(aggregator._metrics) == 1


def test_consumer_config_error(caplog, check, dd_run_check):
instance = {'kafka_connect_str': KAFKA_CONNECT_STR, 'tags': ['optional:tag1']}
kafka_consumer_check = check(instance, client=KafkaPythonClient)
kafka_consumer_check = check(instance)

dd_run_check(kafka_consumer_check, extract_message=True)
assert 'monitor_unlisted_consumer_groups is False' in caplog.text


def test_no_topics(aggregator, check, kafka_instance, dd_run_check):
kafka_instance['consumer_groups'] = {'my_consumer': {}}
dd_run_check(check(kafka_instance, client=KafkaPythonClient))
dd_run_check(check(kafka_instance))

assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}})


def test_no_partitions(aggregator, check, kafka_instance, dd_run_check):
kafka_instance['consumer_groups'] = {'my_consumer': {'marvel': []}}
dd_run_check(check(kafka_instance, client=KafkaPythonClient))
dd_run_check(check(kafka_instance))

assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}})


def test_version_metadata(datadog_agent, check, kafka_instance, dd_run_check):
kafka_consumer_check = check(kafka_instance, client=KafkaPythonClient)
kafka_consumer_check = check(kafka_instance)
kafka_consumer_check.check_id = 'test:123'

kafka_client = kafka_consumer_check.client.create_kafka_admin_client()
Expand All @@ -87,7 +86,7 @@ def test_monitor_broker_highwatermarks(dd_run_check, check, aggregator, is_enabl
'consumer_groups': {'my_consumer': {'marvel': None}},
'monitor_all_broker_highwatermarks': is_enabled,
}
dd_run_check(check(instance, client=KafkaPythonClient))
dd_run_check(check(instance))

# After refactor and library migration, write unit tests to assert expected metric values
aggregator.assert_metric('kafka.broker_offset', count=metric_count)
Expand Down
14 changes: 7 additions & 7 deletions kafka_consumer/tests/python_client/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from datadog_checks.base import ConfigurationError
from datadog_checks.dev.utils import get_metadata_metrics
from datadog_checks.kafka_consumer.client.kafka_python_client import KafkaPythonClient, OAuthTokenProvider
from datadog_checks.kafka_consumer.client.kafka_python_client import OAuthTokenProvider

from ..common import KAFKA_CONNECT_STR, metrics

Expand All @@ -25,7 +25,7 @@ def test_gssapi(kafka_instance, dd_run_check, check):
# assert the check doesn't fail with:
# Exception: Could not find main GSSAPI shared library.
with pytest.raises(Exception, match='check_version'):
dd_run_check(check(instance, client=KafkaPythonClient))
dd_run_check(check(instance))


def test_tls_config_ok(check, kafka_instance_tls):
Expand All @@ -34,7 +34,7 @@ def test_tls_config_ok(check, kafka_instance_tls):
tls_context = mock.MagicMock()
ssl.SSLContext.return_value = tls_context

kafka_consumer_check = check(kafka_instance_tls, client=KafkaPythonClient)
kafka_consumer_check = check(kafka_instance_tls)
with mock.patch('datadog_checks.kafka_consumer.KafkaCheck.get_tls_context', return_value=tls_context):
assert kafka_consumer_check.client._tls_context == tls_context
assert kafka_consumer_check.client._tls_context.check_hostname is True
Expand Down Expand Up @@ -81,7 +81,7 @@ def test_oauth_config(sasl_oauth_token_provider, check, expected_exception):
instance.update(sasl_oauth_token_provider)

with expected_exception:
check(instance, client=KafkaPythonClient).check(instance)
check(instance).check(instance)


def test_oauth_token_client_config(check, kafka_instance):
Expand All @@ -95,7 +95,7 @@ def test_oauth_token_client_config(check, kafka_instance):
}

with mock.patch('kafka.KafkaAdminClient') as kafka_admin_client:
kafka_consumer_check = check(kafka_instance, client=KafkaPythonClient)
kafka_consumer_check = check(kafka_instance)
kafka_consumer_check.client._create_kafka_client(clazz=kafka_admin_client)
params = kafka_admin_client.call_args_list[0].kwargs

Expand All @@ -119,7 +119,7 @@ def test_tls_config_legacy(extra_config, expected_http_kwargs, check, kafka_inst
instance = kafka_instance
instance.update(extra_config)

kafka_consumer_check = check(instance, client=KafkaPythonClient)
kafka_consumer_check = check(instance)
kafka_consumer_check.get_tls_context()
actual_options = {
k: v for k, v in kafka_consumer_check._tls_context_wrapper.config.items() if k in expected_http_kwargs
Expand Down Expand Up @@ -217,7 +217,7 @@ def test_tls_config_legacy(extra_config, expected_http_kwargs, check, kafka_inst
def test_config(dd_run_check, check, instance, aggregator, expected_exception, exception_msg, metric_count, caplog):
caplog.set_level(logging.DEBUG)
with expected_exception:
dd_run_check(check(instance, client=KafkaPythonClient))
dd_run_check(check(instance))

for m in metrics:
aggregator.assert_metric(m, count=metric_count)
Expand Down

0 comments on commit e35d80b

Please sign in to comment.