Skip to content

Commit

Permalink
Drop iteritems & text_type (#4379)
Browse files Browse the repository at this point in the history
With the impending move to python 3, drop `iteritems` in favor of just
using `items`. The perf hit is minimal given the size of the data, and
just makes the code cleaner now that my eyes are used to python 3 code.

Additionally, `text_type` doesn't really make sense here... it's already
a string, no need to recast to a string.
  • Loading branch information
jeffwidman authored and ofek committed Aug 15, 2019
1 parent 177925e commit 9050d57
Showing 1 changed file with 18 additions and 18 deletions.
36 changes: 18 additions & 18 deletions kafka_consumer/datadog_checks/kafka_consumer/legacy_0_10_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from kafka.structs import TopicPartition
from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError
from six import iteritems, string_types, text_type
from six import string_types

from datadog_checks.base import AgentCheck, ConfigurationError, is_affirmative

Expand Down Expand Up @@ -100,8 +100,8 @@ def check(self, instance):

if not topics:
# val = {'consumer_group': {'topic': [0, 1]}}
for _, tps in iteritems(consumer_groups):
for topic, partitions in iteritems(tps):
for _, tps in consumer_groups.items():
for topic, partitions in tps.items():
topics[topic].update(partitions)

warn_msg = """ Discovered %s partition contexts - this exceeds the maximum
Expand All @@ -123,7 +123,7 @@ def check(self, instance):
return

# Report the broker highwater offset
for (topic, partition), highwater_offset in iteritems(highwater_offsets):
for (topic, partition), highwater_offset in highwater_offsets.items():
broker_tags = ['topic:%s' % topic, 'partition:%s' % partition]
broker_tags.extend(self._custom_tags)
self.gauge('kafka.broker_offset', highwater_offset, tags=broker_tags)
Expand Down Expand Up @@ -244,28 +244,28 @@ def _get_broker_offsets(self, topics):
topic_partitions_without_a_leader = []
topics_to_fetch = defaultdict(set)

for topic, partitions in iteritems(topics):
for topic, partitions in topics.items():
# if no partitions are provided
# we're falling back to all available partitions (?)
if len(partitions) == 0:
partitions = self._kafka_client.cluster.available_partitions_for_topic(topic)
topics_to_fetch[topic].update(partitions)

leader_tp = defaultdict(lambda: defaultdict(set))
for topic, partitions in iteritems(topics_to_fetch):
for topic, partitions in topics_to_fetch.items():
for partition in partitions:
partition_leader = self._kafka_client.cluster.leader_for_partition(TopicPartition(topic, partition))
if partition_leader is not None and partition_leader >= 0:
leader_tp[partition_leader][topic].add(partition)

max_offsets = 1
for node_id, tps in iteritems(leader_tp):
for node_id, tps in leader_tp.items():
# Construct the OffsetRequest
request = OffsetRequest[0](
replica_id=-1,
topics=[
(topic, [(partition, OffsetResetStrategy.LATEST, max_offsets) for partition in partitions])
for topic, partitions in iteritems(tps)
for topic, partitions in tps.items()
],
)

Expand All @@ -281,7 +281,7 @@ def _report_consumer_metrics(
):
if unled_topic_partitions is None:
unled_topic_partitions = []
for (consumer_group, topic, partition), consumer_offset in iteritems(consumer_offsets):
for (consumer_group, topic, partition), consumer_offset in consumer_offsets.items():
# Report the consumer group offsets and consumer lag
if (topic, partition) not in highwater_offsets:
self.log.warn(
Expand Down Expand Up @@ -364,14 +364,14 @@ def _get_zk_consumer_offsets(self, consumer_groups=None):
for consumer_group in self._get_zk_path_children(zk_path_consumer, 'consumer groups')
}

for consumer_group, topics in iteritems(consumer_groups):
for consumer_group, topics in consumer_groups.items():
if not topics:
# If topics are't specified, fetch them from ZK
zk_path_topics = zk_path_topic_tmpl.format(group=consumer_group)
topics = {topic: None for topic in self._get_zk_path_children(zk_path_topics, 'topics')}
consumer_groups[consumer_group] = topics

for topic, partitions in iteritems(topics):
for topic, partitions in topics.items():
if partitions:
partitions = set(partitions) # defend against bad user input
else:
Expand Down Expand Up @@ -406,10 +406,10 @@ def _get_kafka_consumer_offsets(self, instance, consumer_groups):
consumer_offsets = {}
topics = defaultdict(set)

for consumer_group, topic_partitions in iteritems(consumer_groups):
for consumer_group, topic_partitions in consumer_groups.items():
try:
single_group_offsets = self._get_single_group_offsets_from_kafka(consumer_group, topic_partitions)
for (topic, partition), offset in iteritems(single_group_offsets):
for (topic, partition), offset in single_group_offsets.items():
topics[topic].update([partition])
key = (consumer_group, topic, partition)
consumer_offsets[key] = offset
Expand All @@ -430,17 +430,17 @@ def _get_single_group_offsets_from_kafka(self, consumer_group, topic_partitions)
"""Get offsets for a single consumer group from Kafka"""
consumer_offsets = {}
tps = defaultdict(set)
for topic, partitions in iteritems(topic_partitions):
for topic, partitions in topic_partitions.items():
if len(partitions) == 0:
partitions = self._kafka_client.cluster.available_partitions_for_topic(topic)
tps[topic] = tps[text_type(topic)].union(set(partitions))
tps[topic].update(partitions)

coordinator_id = self._get_group_coordinator(consumer_group)
if coordinator_id is not None:
# Kafka protocol uses OffsetFetchRequests to retrieve consumer offsets:
# https://kafka.apache.org/protocol#The_Messages_OffsetFetch
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetFetchRequest
request = OffsetFetchRequest[1](consumer_group, list(iteritems(tps)))
request = OffsetFetchRequest[1](consumer_group, tps.items())
response = self._make_blocking_req(request, node_id=coordinator_id)
for (topic, partition_offsets) in response.topics:
for partition, offset, _, error_code in partition_offsets:
Expand All @@ -463,12 +463,12 @@ def _validate_explicit_consumer_groups(cls, val):
val = {'consumer_group': {'topic': [0, 1]}}
"""
assert isinstance(val, dict)
for consumer_group, topics in iteritems(val):
for consumer_group, topics in val.items():
assert isinstance(consumer_group, string_types)
# topics are optional
assert isinstance(topics, dict) or topics is None
if topics is not None:
for topic, partitions in iteritems(topics):
for topic, partitions in topics.items():
assert isinstance(topic, string_types)
# partitions are optional
assert isinstance(partitions, (list, tuple)) or partitions is None
Expand Down

0 comments on commit 9050d57

Please sign in to comment.