Skip to content

Commit

Permalink
move tls_context back into check
Browse files Browse the repository at this point in the history
  • Loading branch information
fanny-jiang committed Feb 16, 2023
1 parent 1efecf5 commit 318195b
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@


class KafkaClient(ABC):
def __init__(self, check) -> None:
def __init__(self, check, tls_context) -> None:
self.check = check
self.log = check.log
self._kafka_client = None
self._highwater_offsets = {}
self._consumer_offsets = {}
self._context_limit = check._context_limit
self._tls_context = tls_context

def should_get_highwater_offsets(self):
return len(self._consumer_offsets) < self._context_limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
from datadog_checks.kafka_consumer.client.kafka_python_client import KafkaPythonClient


def make_client(check, config) -> KafkaClient:
return KafkaPythonClient(check, config)
def make_client(check, config, tls_context) -> KafkaClient:
return KafkaPythonClient(check, config, tls_context)
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ def token(self):


class KafkaPythonClient(KafkaClient):
def __init__(self, check, config) -> None:
def __init__(self, check, config, tls_context) -> None:
self.check = check
self.config = config
self.log = check.log
self._kafka_client = None
self._highwater_offsets = {}
self._consumer_offsets = {}
self._context_limit = check._context_limit
self._tls_context = tls_context

def get_consumer_offsets(self):
"""Fetch Consumer Group offsets from Kafka.
Expand Down Expand Up @@ -173,11 +174,10 @@ def _create_kafka_client(self, clazz):
if isinstance(kafka_version, str):
kafka_version = tuple(map(int, kafka_version.split(".")))

tls_context = self.check.get_tls_context()
crlfile = self.check.instance.get('ssl_crlfile', self.check.instance.get('tls_crlfile'))
if crlfile:
tls_context.load_verify_locations(crlfile)
tls_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
self._tls_context.load_verify_locations(crlfile)
self._tls_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF

return clazz(
bootstrap_servers=kafka_connect_str,
Expand All @@ -200,7 +200,7 @@ def _create_kafka_client(self, clazz):
if 'sasl_oauth_token_provider' in self.check.instance
else None
),
ssl_context=tls_context,
ssl_context=self._tls_context,
)

@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ def __init__(self, name, init_config, instances):
)
self._consumer_groups = self.instance.get('consumer_groups', {})
self._broker_requests_batch_size = self.instance.get('broker_requests_batch_size', BROKER_REQUESTS_BATCH_SIZE)
self.client = make_client(self, self.config)
tls_context = self.get_tls_context()
self.client = make_client(self, self.config, tls_context)

def check(self, _):
"""The main entrypoint of the check."""
Expand Down

0 comments on commit 318195b

Please sign in to comment.