-
-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(reprocessing): Batch handle_remaining_events, fix more race conditions [INGEST-452] #29033
Conversation
@@ -2313,10 +2313,30 @@ def build_cdc_postgres_init_db_volume(settings): | |||
|
|||
SENTRY_USE_UWSGI = True | |||
|
|||
# When copying attachments for to-be-reprocessed events into processing store, | |||
# how large is an individual file chunk? Each chunk is stored as Redis key. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for adding comments on these constants.
src/sentry/reprocessing2.py
Outdated
event IDs in Redis. We need this because Snuba cannot handle many tiny messages and prefers big ones instead. | ||
|
||
Best performance is achieved when timestamps are close together. Luckily we | ||
happen to iterate through events ordered by timestamp. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way this comment is written suggests that we should enforce this, or write this as a precondition.
src/sentry/reprocessing2.py
Outdated
project_id=project_id, | ||
old_group_id=old_group_id, | ||
new_group_id=new_group_id, | ||
event_ids=event_ids_batch, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sends about 20kB into the task queue. Are we comfortable with such large argument lists?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we draw the line at event-payload-sized payloads, but I think it may be easy enough to refactor this code to simply keep things in Redis
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but I think it may be easy enough to refactor this code to simply keep things in Redis
It would be a nice separation if one task pushes to redis and the other task reads from it (but that's a nit).
src/sentry/reprocessing2.py
Outdated
|
||
# TODO: Redis 6 introduces LPOP with <count> argument, use here? | ||
while True: | ||
row = client.lpop(key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you intend to empty the key, can you check if you can atomically take the entire key value here instead of popping them individually?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There actually seems no way to do that.. redis doesn't have a "pop key" command. I however moved it into the other Celery task now.
src/sentry/reprocessing2.py
Outdated
key = f"re2:remaining:{project_id}:{old_group_id}" | ||
|
||
if datetime_to_event: | ||
client.lpush( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that LPUSH
returns the list size, so we could omit the call to LLEN
further down.. but only if LPUSH
is actually called. Not sure if worth it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, added.
src/sentry/reprocessing2.py
Outdated
project_id=project_id, | ||
old_group_id=old_group_id, | ||
new_group_id=new_group_id, | ||
event_ids=event_ids_batch, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but I think it may be easy enough to refactor this code to simply keep things in Redis
It would be a nice separation if one task pushes to redis and the other task reads from it (but that's a nit).
src/sentry/reprocessing2.py
Outdated
# Our internal sync counters are counting over *all* events, but the | ||
# progressbar in the frontend goes until max_events. Advance progressbar | ||
# proportionally. | ||
pending = int(int(pending) * info["totalEvents"] / float(info.get("syncCount", 1))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be out-of-scope for this PR, but should we maybe just update the UI to reflect the number of events that are actually being touched?
Is there any other API client that relies on this information being a real event count, instead of a scaled down version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's too many places that we'd have to redesign wording of. Besides the progressbar the same counter is used in the activity feed of an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All concerns addressed, good to go from my end once E2E tests are passing.
Please attend the deploy and watch for new errors.
try: | ||
# Rename `key` to a new temp key that is passed to celery task. We | ||
# use `renamenx` instead of `rename` only to detect UUID collisions. | ||
assert client.renamenx(key, new_key), "UUID collision for new_key?" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional: Can we catch this down and report to Sentry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should halt reprocessing... if we simply continue with the next page, key
might become too large.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could potentially catch down the assertion, delete key
from redis and sort-of chug on with reprocessing
but I think the best way to recover from random errors in reprocessing could be: abort reprocessing, merge the old and new issues together via regular issue merge, tell the user "sorry" and hope no data was lost
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only minor comments, otherwise this looks good to me!
|
||
if datetime_to_event: | ||
client.lpush( | ||
llen = client.lpush( | ||
key, | ||
*(f"{to_timestamp(datetime)};{event_id}" for datetime, event_id in datetime_to_event), | ||
) | ||
client.expire(key, settings.SENTRY_REPROCESSING_SYNC_TTL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This might simplify the condition further down:
else:
llen = client.llen(key)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed!
# `key` does not exist in Redis. `ResponseError` is a bit too broad | ||
# but it seems we'd have to do string matching on error message | ||
# otherwise. | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we log anything here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this condition happens "too often", i.e. when reprocessing is finished and we execute force-flushing
|
||
event_ids_batch.append(event_id) | ||
|
||
client.delete(key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
client.lpop()
was atomic, right? Is there any danger of two task instances running at the same time and working on the same lrange
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since I now rename the key to something unique I don't have concurrent access on it anymore.
|
||
# How many event IDs to buffer up in Redis before sending them to Snuba. This | ||
# is about "remaining events" exclusively. | ||
SENTRY_REPROCESSING_REMAINING_EVENTS_BUF_SIZE = 500 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be fine. When we decide to raise this number, please keep Search and Storage involved to ensure we do not go beyond the maximum Clickhouse query size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems good to me
…tched-remaining-events
We want to reprocess 10 events at a time, but that causes problems in Snuba. ClickHouse is not very good at processing many tiny INSERTs, and unfortunately every eventstream message, in our case
replace_groups
, maps to exactly one of those queries.Implement simple Redis-based batching for handle_remaining_events such that we don't take ClickHouse down with our many tiny event ID requests. With this PR we send 500 events at a time to Snuba.
This unearthed a couple other race conditions with regards to how we finish reprocessing. See the removed comment:
"finish_reprocessing may execute sooner than the last reprocess_group", but it can also happen that
handle_remaining_events
runs afterexclude_groups
. This is not something I would expect to happen, but because Snuba replacements completely ignore group exclusions it all just happens to work out in the end.This PR changes our synchronization countdown to contain the full event count, and call
mark_event_reprocessed
for every single event regardless ofmax_events
.This makes our entire synchronization model simpler. Basically we now:
<number of events in snuba>
handle_remaining_events
orpreprocess_event
mark_event_reprocessed
unconditionallyfinish_reprocessing
Before:
min(max_events, <number of events in snuba>)
preprocess_event
which will decrement the count, OR spawnhandle_remaining_events
which won't (but sometimes it does, e.g. when an event is handled as "remaining" because of missing attachments)finish_reprocessing
is scheduled before or afterexclude_groups
but it doesn't matter because of Snuba internalsUnfortunately that Redis key is used for two things:
Meaning that when we change this, it has UI impact. In a job "reprocess m out of n events", the UI should show a progressbar going from 0 to m. With our change it would show a progressbar going from 0 to n.
To counteract that, we still report the bounds of the progressbar to the UI as before, and additionally downscale our sync counter to be within bounds
0..m
again whenever the UI needs it.The only UI-visible impact this has is that the progressbar is advanced more evenly: When reprocessing 3 out of 1000000 events, it used to be that the progressbar immediately goes to 3/3, then just sort of hangs while Sentry is migrating/deleting remaining events.
Follow-up items