From 03fe65c093c64505dc47a99ebd84591b044cf1c1 Mon Sep 17 00:00:00 2001 From: Nikhar Saxena <84807402+nikhars@users.noreply.github.com> Date: Fri, 8 Oct 2021 10:32:04 -0700 Subject: [PATCH] feat(post-process-forwarder) Add header for transaction processing (#28908) Add a kafka header called "transaction_forwarder". Currently it is set to False. Once the transaction post process forwarder is implemented and deployed in production, and it has caught up kafka offsets with the current post process forwarder, we will change the value based on the type of message. --- src/sentry/eventstream/kafka/backend.py | 28 ++++++++++++++++--------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/src/sentry/eventstream/kafka/backend.py b/src/sentry/eventstream/kafka/backend.py index 69c10a16c998d8..5c3415d622d0df 100644 --- a/src/sentry/eventstream/kafka/backend.py +++ b/src/sentry/eventstream/kafka/backend.py @@ -66,6 +66,10 @@ def encode_bool(value: Optional[bool]) -> str: def strip_none_values(value: Mapping[str, Optional[str]]) -> Mapping[str, str]: return {key: value for key, value in value.items() if value is not None} + # TODO: Change transaction_forwarder to be intelligent once transaction post process forwarder + # is implemented and caught up with current events post process forwarder. + transaction_forwarder = False + send_new_headers = options.get("eventstream:kafka-headers") if send_new_headers is True: @@ -80,19 +84,23 @@ def strip_none_values(value: Mapping[str, Optional[str]]) -> Mapping[str, str]: "is_new_group_environment": encode_bool(is_new_group_environment), "is_regression": encode_bool(is_regression), "skip_consume": encode_bool(skip_consume), + "transaction_forwarder": encode_bool(transaction_forwarder), } ) else: - return super()._get_headers_for_insert( - group, - event, - is_new, - is_regression, - is_new_group_environment, - primary_hash, - received_timestamp, - skip_consume, - ) + return { + **super()._get_headers_for_insert( + group, + event, + is_new, + is_regression, + is_new_group_environment, + primary_hash, + received_timestamp, + skip_consume, + ), + "transaction_forwarder": encode_bool(transaction_forwarder), + } def _send( self,