-
-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(sentry-metrics): Metrics indexer consumer #28431
Conversation
@fpacifici @jjbayer (cc @jan-auer): some questions/concerns/thoughts I have:
|
For release health we cannot really mock tag values, because the
I think it's not necessary from a functional perspective -- but maybe for partitioning? |
Updates:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this is a step in the right direction.
Will review again tomorrow with some more details on how to produce in an efficient manner.
I am not sure about the goal of that pubsub class that does not allow us to set a callback.
@click.option("--topic", default="ingest-metrics", help="Topic to get subscription updates from.") | ||
@batching_kafka_options("metrics-consumer") | ||
@configuration | ||
def metrics_consumer(**options): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this going to start the consumer by default? I don't think we need it yet by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've had to manually run sentry run metrics-consumer
after running the sentry devserver
so I don't think this starts by default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How come we are adding a separate command rather than using the single ingest-consumer
command which runs the rest of the ingest consumers? We could still temporarily omit metrics from --all-consumer-types
and only run it if explicitly called with the metrics consumer type if was the concern. Curious if there is another reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lynnagara (cc @fpacifici) the ingest-consumer
commands ends up using the IngestConsumerWorker
where as we want to use the MetricsIndexerWorker
instead. I felt like it was easier to keep these separate for now then to refactor the ingest-consumer
command. It seems like this could be easily changed down the line if we wanted, but open to changing it now if people feel strongly
snuba_metrics_publisher = KafkaPublisher( | ||
kafka_config.get_kafka_producer_cluster_options(cluster_name), | ||
asynchronous=False, | ||
) | ||
snuba_metrics_publisher.publish(snuba_metrics["topic"], json.dumps(message)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would flush the messages for every message, which is not ideal considering you have batches of messages and you can flush before committing on the ingest topic only once per batch of messages.
We should also set the callback for each message.
I would have a look at this approach to see how to use the callback
https://github.com/getsentry/cdc/blob/8643ee7a5bf491755c46169c6841131521d34b6c/cdc/producer.py#L41-L161
You probably do not need all that complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some follow up on what a consumer should do:
When writing a consumer like this that needs to achieve a high throughput, there are a few elements to take into account and a few requirements:
- We cannot auto commit, because we would be committing before processing the message. If there is an error in processing the message is lost. This is taken into account as we do not auto commit
- We don't need to commit after every offset. Less load on the broker and network
- We must not loose messages. So if an error happens at any point during processing we cannot commit the entire batch but only the portion that we sent to Kafka
- producing is asynchronous. So we need to wait for the callback before being sure that the message is persisted in a kafka topic https://docs.confluent.io/clients-confluent-kafka-python/current/overview.html#asynchronous-writes
- We should really avoid duplicates, as they would not be deduplicated in Clickhouse since metrics are stored in a pre-aggregated way. So if there is an error during processing that causes the consumer to crash we should have committed up to the last acknowledged message (acknowledged meaning that we did receive the callback from the producer).
- We should keep producing asynchronously and not flush every message individually as that would have a real impact on throughput.
- Exactly one semantics is technically not achievable as nobody can deduplicate messages. But we can make duplication extremely rare (basically only in case Kafka commit fails multiple times or the consumer crashes for out of memory without being able to commit and after flushing.
So there are a few ways to do that:
- Simple batching consumer. Do the processing phase, then the batch flush sends all the messages, at the end flushes and waits for callbacks. It only commits on kafka the offset of the last callback received.
- Something like the cdc link above. Keep producing messages as soon as they are processed and periodically commit the last offset we got the callback for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great! I did a manual end-to-end test locally and everything works as expected.
|
||
def resolve(self, organization: Organization, use_case: UseCase, string: str) -> Optional[int]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Contrary to what I said previously, I think it would make sense to keep the resolve
method. It's already in use here, and if the indexer entries ever get a TTL, it would not make sense to prolong the retention every time the indexer is queried from the product side.
289eb4b
to
7d65ee2
Compare
4d13ffe
to
23021e9
Compare
933819c
to
9d4cbd9
Compare
LGTM. Nice job. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good first step. Thanks
on_delivery=self.callback, | ||
) | ||
|
||
messages_left = self.__producer.flush(5.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please as a follow up PR. We will want a metric here to measure how long we wait.
Please figure out what is wrong with CI before merging |
The UseCase arg was removed in #28431.
* WIP(metrics): Metrics indexer consumer * use redis for mock indexer * add bulk_record and async producing * make redis mock indexer separate file * fix type errors * add comment * remove UseCase and updates tests * update more tests * clean up part I * mini cleanup * add basic tests * all org_ids are ints * missed one * more clean up * consumer test * rename test file * lil updates * attempt to fix tests * try dis tho
The UseCase arg was removed in #28431.
Metrics Indexer Consumer:
Messages produced by Relay into the
ingest-metrics
topic have a metricname
along with any number of tag key, value string pairs associated with the metric. The snuba topicsnuba-metrics
(which be changed here) expects the integers instead of strings.The indexer (which will be implemented later) will store the string to int relationship in postgres, but for now this just uses the mock dummy indexer to actually do the conversion.
In this PR the consumer consumes messages from the
ingest-metrics
topic, translate the payload to have ints instead of strings and then produce to thesnuba-metrics
topic so that snuba can then store the data.RedisMockIndexer
The temporary redis indexer can be used by changing the following in the
conf/server.py
:It also uses a
bulk_record
method to be able to get and set all the strings (metric name, tag keys and values) for a message at once.