Skip to content

Commit

Permalink
Move validation of correct consumer group configuration to Config class
Browse files Browse the repository at this point in the history
Since it's a configuration concern, that's where it seems to belong,
and the behavior would thus be consistent with where other
`ConfigurationError`s are raised.
  • Loading branch information
alopezz committed Apr 18, 2023
1 parent 8385026 commit 6e307dd
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 46 deletions.
6 changes: 0 additions & 6 deletions kafka_consumer/datadog_checks/kafka_consumer/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from confluent_kafka import Consumer, ConsumerGroupTopicPartitions, KafkaException, TopicPartition
from confluent_kafka.admin import AdminClient

from datadog_checks.base import ConfigurationError
from datadog_checks.kafka_consumer.constants import KAFKA_INTERNAL_TOPICS


Expand Down Expand Up @@ -173,11 +172,6 @@ def _get_consumer_groups(self):
return consumer_groups
elif self.config._consumer_groups:
return self.config._consumer_groups
else:
raise ConfigurationError(
"Cannot fetch consumer offsets because no consumer_groups are specified and "
"monitor_unlisted_consumer_groups is %s." % self.config._monitor_unlisted_consumer_groups
)

def _get_consumer_offset_futures(self, consumer_groups):
topics = self.kafka_client.list_topics(timeout=self.config._request_timeout)
Expand Down
7 changes: 7 additions & 0 deletions kafka_consumer/datadog_checks/kafka_consumer/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ def validate_config(self):

if self._consumer_groups and self._consumer_groups_regex:
self.log.warning("Using consumer_groups and consumer_groups_regex, will combine the two config options.")

if not (self._monitor_unlisted_consumer_groups or self._consumer_groups or self._consumer_groups_regex):
raise ConfigurationError(
"Cannot fetch consumer offsets because no consumer_groups are specified and "
"monitor_unlisted_consumer_groups is %s." % self._monitor_unlisted_consumer_groups
)

self._validate_consumer_groups()

def _compile_regex(self, consumer_groups_regex):
Expand Down
30 changes: 10 additions & 20 deletions kafka_consumer/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ def test_check_kafka_metrics_limit(aggregator, check, kafka_instance, dd_run_che
assert len(aggregator._metrics) == 1


def test_consumer_config_error(caplog, check, dd_run_check, kafka_instance):
def test_consumer_config_error(check, dd_run_check, kafka_instance):
del kafka_instance['consumer_groups']
kafka_consumer_check = check(kafka_instance)

dd_run_check(kafka_consumer_check, extract_message=True)
assert 'monitor_unlisted_consumer_groups is False' in caplog.text
with pytest.raises(Exception, match="monitor_unlisted_consumer_groups is False"):
dd_run_check(kafka_consumer_check, extract_message=True)


def test_no_topics(aggregator, check, kafka_instance, dd_run_check):
Expand Down Expand Up @@ -91,88 +91,78 @@ def test_monitor_broker_highwatermarks(


@pytest.mark.parametrize(
'override, expected_exception, exception_msg, metric_count',
'override, expected_exception, metric_count',
[
pytest.param(
{'kafka_connect_str': 12},
pytest.raises(
Exception, match='ConfigurationError: `kafka_connect_str` should be string or list of strings'
),
'',
0,
id="Invalid Non-string kafka_connect_str",
),
pytest.param(
{'consumer_groups': {}},
does_not_raise(),
'ConfigurationError: Cannot fetch consumer offsets because no consumer_groups are specified and '
'monitor_unlisted_consumer_groups is False',
pytest.raises(
Exception,
match='ConfigurationError: Cannot fetch consumer offsets because no consumer_groups are specified and '
'monitor_unlisted_consumer_groups is False',
),
0,
id="Empty consumer_groups",
),
pytest.param(
{'kafka_connect_str': None},
pytest.raises(Exception, match='kafka_connect_str\n none is not an allowed value'),
'',
0,
id="Invalid Nonetype kafka_connect_str",
),
pytest.param(
{'kafka_connect_str': ['localhost:9092', 'localhost:9093'], 'monitor_unlisted_consumer_groups': True},
does_not_raise(),
'',
4,
id="Valid list kafka_connect_str",
),
pytest.param(
{'monitor_unlisted_consumer_groups': True},
does_not_raise(),
'',
4,
id="Valid str kafka_connect_str",
),
pytest.param(
{'consumer_groups': {}, 'monitor_unlisted_consumer_groups': True},
does_not_raise(),
'',
4,
id="Empty consumer_groups and monitor_unlisted_consumer_groups true",
),
pytest.param(
{'consumer_groups': {'my_consumer': None}},
does_not_raise(),
'',
4,
id="One consumer group, all topics and partitions",
),
pytest.param(
{'consumer_groups': {'my_consumer': {'marvel': None}}},
does_not_raise(),
'',
2,
id="One consumer group, one topic, all partitions",
),
pytest.param(
{'consumer_groups': {'my_consumer': {'marvel': [1]}}},
does_not_raise(),
'',
1,
id="One consumer group, one topic, one partition",
),
],
)
def test_config(
dd_run_check, check, kafka_instance, override, aggregator, expected_exception, exception_msg, metric_count, caplog
):
caplog.set_level(logging.DEBUG)
def test_config(dd_run_check, check, kafka_instance, override, aggregator, expected_exception, metric_count):
kafka_instance.update(override)
with expected_exception:
dd_run_check(check(kafka_instance))

for m in metrics:
aggregator.assert_metric(m, count=metric_count)

assert exception_msg in caplog.text
aggregator.assert_metrics_using_metadata(get_metadata_metrics())


Expand Down
20 changes: 0 additions & 20 deletions kafka_consumer/tests/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
import mock
import pytest

from datadog_checks.dev.utils import get_metadata_metrics
from datadog_checks.kafka_consumer import KafkaCheck
from tests.common import metrics

pytestmark = [pytest.mark.unit]

Expand All @@ -32,24 +30,6 @@ def test_tls_config_legacy(extra_config, expected_http_kwargs, check, kafka_inst
assert expected_http_kwargs == actual_options


def test_invalid_connect_str(dd_run_check, check, aggregator, caplog, kafka_instance):
caplog.set_level(logging.DEBUG)
kafka_instance['kafka_connect_str'] = 'invalid'
del kafka_instance['consumer_groups']
dd_run_check(check(kafka_instance))

for m in metrics:
aggregator.assert_metric(m, count=0)

exception_msg = (
'ConfigurationError: Cannot fetch consumer offsets because no consumer_groups are specified and '
'monitor_unlisted_consumer_groups is False'
)

assert exception_msg in caplog.text
aggregator.assert_metrics_using_metadata(get_metadata_metrics())


@pytest.mark.parametrize(
'sasl_oauth_token_provider, expected_exception, mocked_admin_client',
[
Expand Down

0 comments on commit 6e307dd

Please sign in to comment.