Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kafka_consumer] enable fetching consumer offsets from kafka #654

Merged
merged 20 commits into from
Nov 7, 2017

Conversation

truthbk
Copy link
Member

@truthbk truthbk commented Aug 4, 2017

What does this PR do?

This PR enabled collection of kafka consumer offsets from kafka.

This PR aims to:

  • supporting collection of consumer offsets from Kafka, in addition to ZK.
  • not break backward compatibility by keeping a single metric space for both sets of offsets collected, and their derived metrics (consumer_lag).
  • where available still support the self discovery of consumer groups, topics and partitions. Work still ongoing to support this when consumer offsets are not written to ZK.

We can now collect the metrics from either source and append the source:kafka or source:zk tag accordingly to the metrics, including kafka.consumer_lag. So both should be available. For customers using one or the other (consumer offsets in kafka vs zookeeper), all dashboards and monitors should continue to work as expected. No changes needed. For people in a migration or mixed setup, we still have to evaluate the impact of having the two metrics. The inclusion of an additional tag canonical is in consideration to label the canonical offsets (again, with customers in the process of migrating).

Although the new KafkaClient has pretty decent support for an async implementation, that is currently out of the scope.

Motivation

Support for newer Kafka's and their new-style consumers storage of offsets in kafka itself.

Testing Guidelines

An overview on testing
is available in our contribution guidelines.

Versioning

  • Bumped the version check in manifest.json
  • Updated CHANGELOG.md

Additional Notes

This is still a WIP. I am trying to limit the changes to the interface as much as possible, while adding all required support. The main change thus far to the configuration is the addition of a kafka_consumer_offsets boolean option at the instance level: if set, then we will attempt to collect offset from kafka (as well as ZK if the zk_connect_str is still set in the YAML). If zk_connect_str is not set we will assume only kafka offsets are wanted.
We have also introduced the kafka_retries option at the init level to configure the maximum number of retries per failed query we will make to kafka after a failed query. Kafka sometimes has it's things like a consumer coordinator change, or a leader change, etc... during these times the metadata collection may fail. Tweaking this value helps us be more resilient to these conditions. In case we ultimately fail to collect the metrics, there's not much to worry about, these issues are transient and should be resolved by the next iteration.

@truthbk truthbk added this to the 5.17 milestone Aug 4, 2017
@truthbk truthbk removed this from the 5.17 milestone Aug 4, 2017
@truthbk truthbk force-pushed the jaime/kafkaoffsets2 branch 2 times, most recently from 81655e6 to 1fe3b77 Compare August 4, 2017 16:17
@truthbk truthbk added this to the 5.19 milestone Oct 4, 2017
@truthbk truthbk force-pushed the jaime/kafkaoffsets2 branch from 1fe3b77 to 4706bab Compare October 13, 2017 19:02
@truthbk truthbk force-pushed the jaime/kafkaoffsets2 branch from 4706bab to c519fe8 Compare October 13, 2017 19:11
@truthbk truthbk changed the base branch from jaime/kafkacli to master October 13, 2017 19:46
@truthbk truthbk force-pushed the jaime/kafkaoffsets2 branch from c519fe8 to aa9f015 Compare October 13, 2017 19:47
@truthbk truthbk changed the title [kafka_consumer][wip] enable fetching consumer offsets from kafka [kafka_consumer] enable fetching consumer offsets from kafka Oct 16, 2017
self.gauge('kafka.consumer_offset', consumer_offset, tags=consumer_group_tags)

consumer_lag = highwater_offsets[(topic, partition)] - consumer_offset
if consumer_lag < 0:
Copy link
Member Author

@truthbk truthbk Oct 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was never a fan of this - I always considered this to be the business of a monitor on our backend (defined via the UI), it clutters the check with something we should probably be doing elsewhere. There was some push from the community to include this, but there was also some disagreement over its real value. If we all agree, and this hasn't been released in any form in any previous PR (can't remember off the top of my head), I'd favor removing it.

Copy link
Contributor

@jeffwidman jeffwidman Oct 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check was the first thing to notify me of problems in our production cluster the last two times it went off, both in the last month. Theoretically we shouldn't have had these problems as this should be impossible with a healthy Kafka cluster, but we did due to human mistakes, so I was very thankful for this.

I always considered this to be the business of a monitor on the backend

I don't understand what you mean here?

At the end of the day, I don't care where the check is located, I just want it to be emitted somehow by default so I don't have to remember to manually configure it within datadog.

Copy link
Contributor

@jeffwidman jeffwidman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this, much much appreciated and I look forward to this landing so that I don't have to run my prototype check (#423) in production.

As you know, we use Kafka heavily in production, and I wrote the initial prototype for fetching consumer offsets from Kafka rather than Zookeeper (#423), as well as being a maintainer on the kafka-python project, so I have some experience here.

At my day job, I am the "owner" of Kafka, and I support a number of development teams writing microservices that talk to Kafka. I want a check that I can run againt the broker that will regularly fetch all offsets for all consumer groups. We have signifcant production deployments of old-style zookeeper-based consumers, old-style kafka-based consumers, and new-style kafka-based consumers.

I strongly care about not having to explicitly list consumer groups because it is an operational headache to manually add/remove consumer groups every time a service is spun up/down. Part of this is because we have multiple clusters scattered across multiple datacenters, so I have to keep track of all the lists for all the clusters. I would much rather have a single version of the config that I can deploy that keeps itself auto-updated so that this is one less thing to worry about. That's why I initially wrote the monitor_unlisted_consumer_groups feature in #271 (merged as part of #612).

So given that background, I wanted to voice several concerns about the general direction this check is heading:

  1. How do I configure the check so that it checks kafka-based offsets all the time, but zookeeper offsets far less frequently?
    • In production, we don't run the zookeeper check more frequently than once every 10 minutes to avoid hitting zookeeper too hard.
    • But we don't rate limit the kafka check because the load on the kafka brokers is trivial since they have an in-memory cache of consumer offsets. My users much prefer this because changes in consumption show up immediately in the dashboard, as well as the higher granularity when debugging.
    • Currently, we rate-limit the zookeeper collection through the min_collection_interval param (details in [kafka_consumer] Show how to check kafka consumer lag less often #212), which applies to the entire check. I'm not aware of a way to have a single check rate-limit only a portion of the check, which means that we're stuck between either hitting zookeeper very frequently, or only checking our kafka-based consumers every 10 minutes, neither of which we're excited about. Is there a way to accomplish this that I'm unaware of?
  2. This check currently punts on supporting monitor_unlisted_consumer_groups from Kafka:
    • I've been down this road myself when I originally prototyped [kafka_consumer] Support fetching Kafka consumer offsets stored in kafka #423... based on that experience, I think that rather than completely punting you should spend some time thinking about what functionality you plan to support long term, even if you only stub it out now and don't finish the implementation until later.
    • My suggestion is to minimize the pain by limiting the kafka-based consumer support for this to only new-style consumer groups on brokers >= 0.10.2. Unfortunately, you pretty much have to require that all old-style kafka-based consumers be explicitly listed. More details in point 4 here: [kafka_consumer] Support fetching Kafka consumer offsets stored in kafka #423 (comment)
    • Making this decision now is important because even if you keep things simple by only supporting monitor_unlisted_consumer_groups for new-style kakfa-based consumers, you'll still need to retain support for old-style kafka-based consumers and this might significantly impact both the config and the code... I rewrote [kafka_consumer] Support fetching Kafka consumer offsets stored in kafka #423 at least twice as I worked through the edgecases.
    • Regardless of what you choose, I want to know upfront what the plans are for this feature, as this is a critical feature for me.
  3. Limiting the check to 100 partitions if monitor_unlisted_consumer_groups is True. This one really scares me; unfortunately I didn't notice until now as [kafka_consumer] set an upper bound to the number of contexts. #753 kinda snuck in there:
    • I think this limit was set without an operational understanding of Kafka internals because the topic __consumer_offsets defaults to 50 partitions right out of the gate, so you're effectively limiting users to only 50 usable partitions in their cluster.
    • Most folks that I talk with who are using Kafka specifically chose Kafka over other technologies because they want its horizontal scalability... even small clusters with just a few brokers typically run multiple consumer processes (where the heavy lifting happens) per consumer group. Kafka requires that within a topic, there be at least one partition per consumer, otherwise the consumer process sits idle... so everyone with any kind of load on their data pipeline runs more than 50 partitions...
    • I'm not seeing a way to change this limit without copy/pasting the check and manually changing CONTEXT_UPPER_BOUND (which is a rather obfuscated name for what is effectively the limit on reported metrics). Did I miss something?
    • I understand the rationale of preventing users from absent-mindedly dumping tons of unused metrics into Datadog, but you already have protection against this because monitor_unlisted_consumer_groups defaults to False. And anyone whose cluster has so many partitions that it's causing problems for Datadog will already be experiencing production issues because an untuned Kafka cluster struggles with more than ~10K-20K partitions. So this doesn't protect against absent-mindedness, it simply erects an additional barrier.
    • Since you're erecting this additional barrier, do you not want to support users who choose to enable monitor_unlisted_consumer_groups and have more than 50 partitions? It's your business and your decision, but if so, I want to know about it now so that we can start transitioning away from Datadog, as this limit is totally unrealistic for us.

attempts = 0
while not client.ready(nodeid):
if attempts > DEFAULT_KAFKA_RETRIES:
self.log.error("unable to connect to broker id: {} after {} attempts".format(nodeid, DEFAULT_KAFKA_RETRIES))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use logging's built-in string interpolation as it will be more performant:

self.log.error("unable to connect to broker id: %i after %i attempts", nodeid, DEFAULT_KAFKA_RETRIES)

also, for readability, can you name it node_id (pythonic) or nodeId (to match kafka-python)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strictly speaking string interpolation is only more performant when the log level for the log entry is lower than the current entry. Being an error log level entry, the string would always be interpolated with no real performance gain.

break
attempts = attempts + 1
delay = (2 ** attempts) + (random.randint(0, 1000) / 1000) * 0.01 # starting at 20 ms
self.log.info("broker id: %s is not ready yet, sleeping for %f ms", nodeid, delay * 10)
Copy link
Contributor

@jeffwidman jeffwidman Oct 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this log line be next to the actual sleep(delay) call? If I'm reading this right, it looks like you're always emitting the log entry even though the code will not actually trigger the delay unless future.failed().

Why delay * 10? Aren't these already in ms?

self.log.error("unable to connect to broker id: {} after {} attempts".format(nodeid, DEFAULT_KAFKA_RETRIES))
break
attempts = attempts + 1
delay = (2 ** attempts) + (random.randint(0, 1000) / 1000) * 0.01 # starting at 20 ms
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

x + (y/ 1000) * 0.01 is effectively x+ y * 0.0001 due to the mathematical order of operations

@@ -102,17 +183,75 @@ def _make_req_async(self, client, request, nodeid=None, cb=None):
if cb:
future.add_callback(cb, request, nodeid, self.current_ts)

def _ensure_ready_node(self, client, nodeid):
if not nodeid:
raise Exception("nodeid is None")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you raise a more specific exception? Perhaps this should be a ValueError?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also noticed the check is whether nodeid is falsey, but the exception message says None... If you're truly just trying to guard against None then the check should be tightened up.

self.log.info("broker id: %s is not ready yet, sleeping for %f ms", nodeid, delay * 10)

future = client.cluster.request_update()
client.poll(future=future)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add the inline comment # block until we get a response for folks less familiar with kafka-python?

key = "{}:{}:{}".format(consumer_group, topic, partition)
self._send_event(title, message, consumer_group_tags, 'consumer_lag',
key, severity="error")
self.log.debug(message)
Copy link
Contributor

@jeffwidman jeffwidman Oct 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would emit this as an log.error(), not log.debug()... this is a critical problem for the kafka cluster and will result in data loss/skippage.

Copy link
Member Author

@truthbk truthbk Oct 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO we shouldn't be relying on logging for these things. Logging should be more of a tool to diagnose if the check is functioning as it should. If you do have indeed a negative kafka consumer lag, the problem is not really with the kafka_consumer check, but rather with your kafka cluster, or consumers or whatever. Logs are always subject to change, I wouldn't recommend relying on the logs to identify a problem as such. We're already submitting an event, and on such a critical case as this you should probably define a monitor on the datadog UI to really have all the visibility you should. With a monitor you can be notified via slack, pagerduty, email,... (the list goes on) as soon as this happens. And if we decide to keep the event, we'd also get an event on the event stream even if no monitor is defined. Is it really necessary?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging should be more of a tool to diagnose if the check is functioning as it should.

I don't actually fully agree with that. I see services emit warning/error log statements all the time that are due to the environment where the service is running. I agree completely they are subject to change and not to be relied upon, they are just another vector for noticing things... whenever I install a new check, one of the first things I do is check the logs to see if any unexpected warnings/errors are popping up since typically we haven't been doing a great job at monitoring those services in the past and may have issues we need to deal with immediately.

tps[topic] = tps[unicode(topic)].union(set(partitions))

# TODO: find reliable way to decide what API version to use for
# OffsetFetchRequest.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KafkaClient.config['api_version'] will auto-probe the version for you, so why not just put in a mapping between OffsetFetchRequest and the broker version tuples?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wasn't really reliable when I started testing this out, but I will give it a shot again. 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good enough to decide between OffsetFetchRequest[0] and OffsetFetchRequest[1]

# TODO: find reliable way to decide what API version to use for
# OffsetFetchRequest.
consumer_offsets = {}
broker_ids = [coord_id] if coord_id else [b.nodeId for b in client.cluster.brokers()]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want to explicitly check for if coord_id is not None because Kafka brokers are allowed to have broker IDs of 0... ie, there's a legitimate bug here.

My memory is fuzzy, but IIRC brokers early in the 0.8 days defaulted to broker ID of 0, or at least I've seen them in some of our environments and as the broker got upgraded the broker ID was never changed.

.travis.yml Outdated
- TRAVIS_FLAVOR=kafka_consumer FLAVOR_VERSION=0.9.0.1-1 FLAVOR_OPTIONS='zookeeper'
- TRAVIS_FLAVOR=kafka_consumer FLAVOR_VERSION=0.9.0.1-1 FLAVOR_OPTIONS='kafka'
- TRAVIS_FLAVOR=kafka_consumer FLAVOR_VERSION=0.10.2.0-1 FLAVOR_OPTIONS='zookeeper'
- TRAVIS_FLAVOR=kafka_consumer FLAVOR_VERSION=0.10.2.0-1 FLAVOR_OPTIONS='kafka'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason you don't run 0.10.2.1 and 0.11.0.1?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bumped to 0.10.2.1 and added 0.11.0.1 - thanks for raising this :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still see 0.10.2.0 rather than 0.10.2.1... am I misreading something?

self.log.warn("Kafka broker returned UNKNOWN_TOPIC_OR_PARTITION (error_code 3) for "
"topic: %s, partition: %s. This should only happen if the topic is currently being deleted.",
topic, partition)
elif error_code == 6:
elif error_code == KAFKA_NOT_LEADER_FOR_PARTITION:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 for using these

Copy link
Contributor

@masci masci left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We reviewed the last details offline, this is good to go now.
Thanks everyone, very good stuff!

@masci masci modified the milestones: 5.19, 5.20 Nov 3, 2017
@masci
Copy link
Contributor

masci commented Nov 3, 2017

Moved the milestone to 5.20 but ETA will remain the same (~2 weeks from now)

@masci masci merged commit 4c59324 into master Nov 7, 2017
@masci masci deleted the jaime/kafkaoffsets2 branch November 7, 2017 09:11
gml3ff pushed a commit that referenced this pull request May 14, 2020
* [windows] add MSI max timeout knob

* [windows] attribute: update attribute description

* [attributes] bump default to 900s to give more time for uninstalls on tiny instances
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants