-
-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(post-process-forwarder) Create batch worker #28666
Conversation
This PR creates a post process forwarder worker which is a subclass of AbstractBatchWorker and can be used with BatchingKafkaConsumer in order to build the post process forwarder. There is test case which verifies functionality of the worker by mocking a message and passing it through the interface methods which would have been called by the BatchingKafkaConsumer. There would be a separate PR which uses this class in the post process forwarder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added code from different places to the postprocessworker.py file since it makes sense for the code to live here. Code from other places hasn't been removed yet since the other code is still being used. Once this code is tested, can start removing other code.
@contextmanager | ||
def _sampled_eventstream_timer(instance: str) -> Generator[None, None, None]: | ||
record_metric = random.random() < 0.1 | ||
if record_metric is True: | ||
with metrics.timer(_DURATION_METRIC, instance=instance): | ||
yield | ||
else: | ||
yield |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def _get_task_kwargs(message: Message) -> Optional[Mapping[str, Any]]: | ||
use_kafka_headers = options.get("post-process-forwarder:kafka-headers") | ||
|
||
if use_kafka_headers: | ||
try: | ||
with _sampled_eventstream_timer(instance="get_task_kwargs_for_message_from_headers"): | ||
return get_task_kwargs_for_message_from_headers(message.headers()) | ||
except Exception as error: | ||
logger.error("Could not forward message: %s", error, exc_info=True) | ||
with metrics.timer(_DURATION_METRIC, instance="get_task_kwargs_for_message"): | ||
return get_task_kwargs_for_message(message.value()) | ||
else: | ||
with metrics.timer(_DURATION_METRIC, instance="get_task_kwargs_for_message"): | ||
return get_task_kwargs_for_message(message.value()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been taken from https://github.com/getsentry/sentry/blob/master/src/sentry/eventstream/kafka/backend.py#L281-L306
With the minor change that dispatching of the task happens in a different function
def _get_task_kwargs_and_dispatch(message: Message): | ||
task_kwargs = _get_task_kwargs(message) | ||
if not task_kwargs: | ||
return None | ||
|
||
_record_metrics(message.partition(), task_kwargs) | ||
dispatch_post_process_group_task(**task_kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the whole loop found here https://github.com/getsentry/sentry/blob/master/src/sentry/eventstream/kafka/backend.py#L281-L309
def dispatch_post_process_group_task( | ||
event_id: str, | ||
project_id: int, | ||
group_id: Optional[int], | ||
is_new: bool, | ||
is_regression: bool, | ||
is_new_group_environment: bool, | ||
primary_hash: Optional[str], | ||
skip_consume: bool = False, | ||
) -> None: | ||
if skip_consume: | ||
logger.info("post_process.skip.raw_event", extra={"event_id": event_id}) | ||
else: | ||
cache_key = cache_key_for_event({"project": project_id, "event_id": event_id}) | ||
|
||
post_process_group.delay( | ||
is_new=is_new, | ||
is_regression=is_regression, | ||
is_new_group_environment=is_new_group_environment, | ||
primary_hash=primary_hash, | ||
cache_key=cache_key, | ||
group_id=group_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been taken from
sentry/src/sentry/eventstream/base.py
Lines 36 to 59 in 585b18e
def _dispatch_post_process_group_task( | |
self, | |
event_id: str, | |
project_id: int, | |
group_id: Optional[int], | |
is_new: bool, | |
is_regression: bool, | |
is_new_group_environment: bool, | |
primary_hash: Optional[str], | |
skip_consume: bool = False, | |
) -> None: | |
if skip_consume: | |
logger.info("post_process.skip.raw_event", extra={"event_id": event_id}) | |
else: | |
cache_key = cache_key_for_event({"project": project_id, "event_id": event_id}) | |
post_process_group.delay( | |
is_new=is_new, | |
is_regression=is_regression, | |
is_new_group_environment=is_new_group_environment, | |
primary_hash=primary_hash, | |
cache_key=cache_key, | |
group_id=group_id, | |
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some questions on the use case when partitions are revoked
dispatch_post_process_group_task(**task_kwargs) | ||
|
||
|
||
class PostProcessForwarderWorker(AbstractBatchWorker): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The post process forwarder commits the staged offsets when a partition is revoked. How do you plan to support that ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have put down some of my thoughts here #28677 (comment)
I deliberately broke up the PR so that I can focus on smaller problems. I want to work on the use cases you mentioned in the integration PR.
There might need to be additional changes which might be required for both the PostProcessForwarderWorker and BatchingKafkaConsumer based on how the integration works
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually you should be fine.
The batching consumer calls the flush method when partitions are revoked.
if exc is not None: | ||
raise exc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this fails in between a batch we would we should not process and execute again the messages that we already successfully scheduled.
Now I am not 100% sure if, when raising the exception here, all partitions will be revoked from this consumer. If that is the case, and you commit the staged offset in the on_revoke method, you will achieve what you need by successfully committing the offsets that were successfully scheduled.
If not we will need to do that some other way.
I think the on_revoke is called in that case, but could you please verify ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a requirement for the post_process_group task since it handles it internally. There is a check for the cached event being present in the event processing store. If the event is not present it means it has already been handled and won't do duplicate work. The code can be found here https://github.com/getsentry/sentry/blob/master/src/sentry/tasks/post_process.py#L206-L215
If we need exactly once guarantees from the consumer, then it would be a little tricky since this uses a threadpool and the ordering of work is not guaranteed. For example, its possible that offset 10 message raised an exception and the threadpool has processed messages from offset 0 to 20. Even if we then commit offset 10, messages from offset 11 to 20 would be duplicated.
The batching kafka consumer does provide dead letter topic if we want to get exactly once semantics. Let me know your thoughts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually nevermind. It is already providing an "at least one" guarantee. If any message fails to be processed we raise an exception and that's it. I don't think the on_revoke method will be called (mind testing this) so the uncommitted messages would be reprocessed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
When you plug this into evetstream it would be useful to avoid the duplication of the logic between this consumer and eventstream (there are a lot of duplicated methods) that we can factor and reuse.
if exc is not None: | ||
raise exc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually nevermind. It is already providing an "at least one" guarantee. If any message fails to be processed we raise an exception and that's it. I don't think the on_revoke method will be called (mind testing this) so the uncommitted messages would be reprocessed
dispatch_post_process_group_task(**task_kwargs) | ||
|
||
|
||
class PostProcessForwarderWorker(AbstractBatchWorker): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually you should be fine.
The batching consumer calls the flush method when partitions are revoked.
if new_concurrency != self.__current_concurrency: | ||
logger.info(f"Switching post-process-forwarder to {new_concurrency} worker threads") | ||
self.__executor.shutdown(wait=True) | ||
self.__executor = ThreadPoolExecutor(max_workers=new_concurrency) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's track a metric with the current threadPool size. the logger.info is not going to be visible anywhere. At least with a metric we will have visibility on the current state when we make changes.
This PR creates a post process forwarder worker which is a subclass of
AbstractBatchWorker and can be used with BatchingKafkaConsumer in order
to build the post process forwarder.
There is test case which verifies functionality of the worker by mocking
a message and passing it through the interface methods which would have
been called by the BatchingKafkaConsumer.
There would be a separate PR which uses this class in the post process
forwarder.