-
-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: DLQ unprocessable messages on ingest-events #66236
Conversation
This PR attempts to be somewhat cautious about DLQing and not DLQ anything that is even potentially retriable. This could be tweaked later. An alternative (and imo better) approach would be to validate the schema of failed messages and only then put them into the DLQ. However no schema is currently registered for this topic so this cannot be done easily.
) | ||
message: IngestMessage = msgpack.unpackb(raw_payload, use_list=False) | ||
try: | ||
raw_payload = raw_message.payload.value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
none of this code changed, it's just indented in try/except block now
|
||
return process_event(message, project, reprocess_only_stuck_events) | ||
except Exception as exc: | ||
raise Retriable(exc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to be safe and avoid dlqing anything potentially retriable, we don't dlq any exception from the process_event
function
|
||
return process_event(message, project, reprocess_only_stuck_events) | ||
except Exception as exc: | ||
# Non retriable exceptions raise InvalidMessage, which Arroyo will DLQ. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure this makes a ton of sense. why put retrieable errors in the DLQ if nobody can retry those DLQ items by consuming that dlq?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the non-retriable errors not retriable ones that go in the DLQ, so the consumer does not get stuck on those. The retriable errors are things that might be caused by network issues, temporary unavailability of database, even a deploy, etc and should not be DLQed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's what I mean. IMO retriable errors should go into the DLQ as well. the DLQ is meant to be replayed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in an ideal world we'd do that eventually but I'm not sure if we're there yet, and we haven't tried it anywhere. IMO inspection of events and replaying needs to be built so it can be done in a really fast and easy manner before taking this step.
Today DLQing everything in the case of temporary blips might cause a longer and more manual recovery period and comes with it's own risks that I haven't fully thought through yet.
Let's do it as a follow up later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would INC-660 messages have been marked as Retriable
or InvalidMessage
in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the meantime, I'm going to work on defining a schema for this topic so we can use that to determine what is valid or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, what you have here seems good to start with, just that we should continue thinking about how to turn it into a general purpose failure handling tool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, agree this is not an end state. Just want to get DLQ in place in and we can tweak what actually gets DLQed later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
During INC-660, we crashed inside process_event
, here:
cache_key = event_processing_store.store(data) |
See this sentry error.
The kafka message itself was populated, but the event JSON inside was empty.
@lynnagara How often does the consumer currently crash because of temporary network outages? If it's rare enough (say, once per month), I would personally go ahead and DLQ all exceptions.
I do believe this PR is an improvement (any DLQ is better than none), just pointing out that it would not have helped with INC-660.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I realised it's going to be a lot of work to figure out what's retriable and what isn't on a consumer by consumer basis. So I rewrote this to check against the schema whenever an exception is raised to decide whether to DLQ it or not. Since there wasn't a schema for ingest-events, I added one here. getsentry/sentry-kafka-schemas#230
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #66236 +/- ##
==========================================
- Coverage 84.26% 82.99% -1.28%
==========================================
Files 5307 5307
Lines 237299 237319 +20
Branches 41053 41053
==========================================
- Hits 199965 196965 -3000
- Misses 37116 40135 +3019
- Partials 218 219 +1
|
@untitaker @jjbayer Can I pick on you two to review this? Would be nice to have some eyes from both the ingest + unified consumer perspective. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, but should we add tests for the new code paths? E.g. one with an invalid message and one that mocks a connection error in process_event
.
@@ -259,6 +259,7 @@ def ingest_events_options() -> list[click.Option]: | |||
"static_args": { | |||
"consumer_type": "events", | |||
}, | |||
"dlq_topic": Topic.INGEST_EVENTS_DLQ, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has the topic already been created in all environments ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
"attachments": Topic.INGEST_ATTACHMENTS, | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not cover the attachment topic.
The functions for attachments are process_attachments_and_events
and decode_and_process_chunks
.
I think we should cover all of them. Whether you want to do attachment in a separate PR that's ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. One step at a time.
codec.decode(raw_payload, validate=True) | ||
except Exception: | ||
raw_value = raw_message.value | ||
assert isinstance(raw_value, BrokerValue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can it be anything else ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no
if default_topic != "ingest-events": | ||
raise | ||
|
||
codec = sentry_kafka_schemas.get_codec(default_topic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this a heavy operation ? Any reason not to instantiate the codec only once ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's cached in the library after the first time
raise InvalidMessage(raw_value.partition, raw_value.offset) | ||
|
||
return process_event(message, project, reprocess_only_stuck_events) | ||
raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we were discussing about inverting the way DLQ classifies retriable errors: Instead of putting the burden on the consumer developer to identify errors that should make us route the message to the DLQ, asking the consumer developers to identify errors that should trigger a crash and treat everything else as InvalidMessage.
Is that still the case and are you planning to deal with this as a separate PR/project ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is still case, but in this specific function it's very difficult because of the specific logic / external systems that are depended and things that can fail here. Practically, I think checking for both an exception + then the schema as well is the safest way in this one case.
@jjbayer I brought back the retriable exception around the processing store blocks to handle the case where message still passes the schema but the inner part of the payload is invalid |
Suspect IssuesThis pull request was deployed and Sentry observed the following issues:
Did you find this useful? React with a 👍 or 👎 |
This PR depends on getsentry/sentry-kafka-schemas#230 It attempts to be somewhat cautious about DLQing and not DLQ anything that is even potentially retriable. This could be tweaked later. If a non-retriable exception is raised, the consumer tries to further determine whether the message is actually bad (or it's some transient retriable error) by validating the message against the schema. The message will be DLQed only if that also fails.
This PR depends on getsentry/sentry-kafka-schemas#230
It attempts to be somewhat cautious about DLQing and not DLQ
anything that is even potentially retriable. This could be tweaked
later.
If a non-retriable exception is raised, the consumer tries to further determine whether the message
is actually bad (or it's some transient retriable error) by validating the message against
the schema. The message will be DLQed only if that also fails.