From 3ebab9819c15c404c0894690f33f827736e4fdd1 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Fri, 15 Mar 2024 13:25:55 -0700 Subject: [PATCH] feat: DLQ unprocessable messages on ingest-events (#66236) This PR depends on https://github.com/getsentry/sentry-kafka-schemas/pull/230 It attempts to be somewhat cautious about DLQing and not DLQ anything that is even potentially retriable. This could be tweaked later. If a non-retriable exception is raised, the consumer tries to further determine whether the message is actually bad (or it's some transient retriable error) by validating the message against the schema. The message will be DLQed only if that also fails. --- src/sentry/consumers/__init__.py | 1 + src/sentry/ingest/consumer/processors.py | 145 ++++++++++-------- src/sentry/ingest/consumer/simple_event.py | 67 ++++++-- tests/relay_integration/test_sdk.py | 2 +- .../sentry/ingest/ingest_consumer/test_dlq.py | 70 +++++++++ 5 files changed, 203 insertions(+), 82 deletions(-) create mode 100644 tests/sentry/ingest/ingest_consumer/test_dlq.py diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index df9311ea9efc45..c5ec2958406e3a 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -259,6 +259,7 @@ def ingest_events_options() -> list[click.Option]: "static_args": { "consumer_type": "events", }, + "dlq_topic": Topic.INGEST_EVENTS_DLQ, }, "ingest-feedback-events": { "topic": Topic.INGEST_FEEDBACK_EVENTS, diff --git a/src/sentry/ingest/consumer/processors.py b/src/sentry/ingest/consumer/processors.py index 5773bca1c11931..42e20cec2d16c3 100644 --- a/src/sentry/ingest/consumer/processors.py +++ b/src/sentry/ingest/consumer/processors.py @@ -32,6 +32,10 @@ IngestMessage = Mapping[str, Any] +class Retriable(Exception): + pass + + def trace_func(**span_kwargs): def wrapper(f): @functools.wraps(f) @@ -127,78 +131,85 @@ def process_event( ): return - # If we only want to reprocess "stuck" events, we check if this event is already in the - # `processing_store`. We only continue here if the event *is* present, as that will eventually - # process and consume the event from the `processing_store`, whereby getting it "unstuck". - if reprocess_only_stuck_events and not event_processing_store.exists(data): - return - - with metrics.timer("ingest_consumer._store_event"): - cache_key = event_processing_store.store(data) - + # Raise the retriable exception and skip DLQ if anything below this point fails as it may be caused by + # intermittent network issue try: - # Records rc-processing usage broken down by - # event type. - event_type = data.get("type") - if event_type == "error": - app_feature = "errors" - elif event_type == "transaction": - app_feature = "transactions" - else: - app_feature = None - - if app_feature is not None: - record(settings.EVENT_PROCESSING_STORE, app_feature, len(payload), UsageUnit.BYTES) - except Exception: - pass - - if attachments: - with sentry_sdk.start_span(op="ingest_consumer.set_attachment_cache"): - attachment_objects = [ - CachedAttachment(type=attachment.pop("attachment_type"), **attachment) - for attachment in attachments - ] - - attachment_cache.set(cache_key, attachments=attachment_objects, timeout=CACHE_TIMEOUT) - - if data.get("type") == "transaction": - # No need for preprocess/process for transactions thus submit - # directly transaction specific save_event task. - save_event_transaction.delay( - cache_key=cache_key, - data=None, - start_time=start_time, - event_id=event_id, - project_id=project_id, - ) - elif data.get("type") == "feedback": - if features.has("organizations:user-feedback-ingest", project.organization, actor=None): - save_event_feedback.delay( - cache_key=None, # no need to cache as volume is low - data=data, - start_time=start_time, - event_id=event_id, - project_id=project_id, - ) - else: - # Preprocess this event, which spawns either process_event or - # save_event. Pass data explicitly to avoid fetching it again from the - # cache. - with sentry_sdk.start_span(op="ingest_consumer.process_event.preprocess_event"): - preprocess_event( + # If we only want to reprocess "stuck" events, we check if this event is already in the + # `processing_store`. We only continue here if the event *is* present, as that will eventually + # process and consume the event from the `processing_store`, whereby getting it "unstuck". + if reprocess_only_stuck_events and not event_processing_store.exists(data): + return + + with metrics.timer("ingest_consumer._store_event"): + cache_key = event_processing_store.store(data) + + try: + # Records rc-processing usage broken down by + # event type. + event_type = data.get("type") + if event_type == "error": + app_feature = "errors" + elif event_type == "transaction": + app_feature = "transactions" + else: + app_feature = None + + if app_feature is not None: + record(settings.EVENT_PROCESSING_STORE, app_feature, len(payload), UsageUnit.BYTES) + except Exception: + pass + + if attachments: + with sentry_sdk.start_span(op="ingest_consumer.set_attachment_cache"): + attachment_objects = [ + CachedAttachment(type=attachment.pop("attachment_type"), **attachment) + for attachment in attachments + ] + + attachment_cache.set( + cache_key, attachments=attachment_objects, timeout=CACHE_TIMEOUT + ) + + if data.get("type") == "transaction": + # No need for preprocess/process for transactions thus submit + # directly transaction specific save_event task. + save_event_transaction.delay( cache_key=cache_key, - data=data, + data=None, start_time=start_time, event_id=event_id, - project=project, - has_attachments=bool(attachments), + project_id=project_id, ) - - # remember for an 1 hour that we saved this event (deduplication protection) - cache.set(deduplication_key, "", CACHE_TIMEOUT) - - # emit event_accepted once everything is done - event_accepted.send_robust(ip=remote_addr, data=data, project=project, sender=process_event) + elif data.get("type") == "feedback": + if features.has("organizations:user-feedback-ingest", project.organization, actor=None): + save_event_feedback.delay( + cache_key=None, # no need to cache as volume is low + data=data, + start_time=start_time, + event_id=event_id, + project_id=project_id, + ) + else: + # Preprocess this event, which spawns either process_event or + # save_event. Pass data explicitly to avoid fetching it again from the + # cache. + with sentry_sdk.start_span(op="ingest_consumer.process_event.preprocess_event"): + preprocess_event( + cache_key=cache_key, + data=data, + start_time=start_time, + event_id=event_id, + project=project, + has_attachments=bool(attachments), + ) + + # remember for an 1 hour that we saved this event (deduplication protection) + cache.set(deduplication_key, "", CACHE_TIMEOUT) + + # emit event_accepted once everything is done + event_accepted.send_robust(ip=remote_addr, data=data, project=project, sender=process_event) + except Exception as exc: + raise Retriable(exc) @trace_func(name="ingest_consumer.process_attachment_chunk") diff --git a/src/sentry/ingest/consumer/simple_event.py b/src/sentry/ingest/consumer/simple_event.py index d705fc2fa305de..e1278ab8890c06 100644 --- a/src/sentry/ingest/consumer/simple_event.py +++ b/src/sentry/ingest/consumer/simple_event.py @@ -1,17 +1,28 @@ import logging import msgpack +import sentry_kafka_schemas from arroyo.backends.kafka.consumer import KafkaPayload -from arroyo.types import Message +from arroyo.dlq import InvalidMessage +from arroyo.types import BrokerValue, Message +from sentry.conf.types.kafka_definition import Topic from sentry.models.project import Project from sentry.utils import metrics -from .processors import IngestMessage, process_event +from .processors import IngestMessage, Retriable, process_event logger = logging.getLogger(__name__) +consumer_type_to_default_topic = { + "events": Topic.INGEST_EVENTS, + "transactions": Topic.INGEST_TRANSACTIONS, + "attachments": Topic.INGEST_ATTACHMENTS, + "ingest-feedback-events": Topic.INGEST_FEEDBACK_EVENTS, +} + + def process_simple_event_message( raw_message: Message[KafkaPayload], consumer_type: str, reprocess_only_stuck_events: bool ) -> None: @@ -36,19 +47,47 @@ def process_simple_event_message( tags={"consumer": consumer_type}, unit="byte", ) - message: IngestMessage = msgpack.unpackb(raw_payload, use_list=False) - message_type = message["type"] - project_id = message["project_id"] + try: + message: IngestMessage = msgpack.unpackb(raw_payload, use_list=False) + + message_type = message["type"] + project_id = message["project_id"] - if message_type != "event": - raise ValueError(f"Unsupported message type: {message_type}") + if message_type != "event": + raise ValueError(f"Unsupported message type: {message_type}") - try: - with metrics.timer("ingest_consumer.fetch_project"): - project = Project.objects.get_from_cache(id=project_id) - except Project.DoesNotExist: - logger.exception("Project for ingested event does not exist: %s", project_id) - return + try: + with metrics.timer("ingest_consumer.fetch_project"): + project = Project.objects.get_from_cache(id=project_id) + except Project.DoesNotExist: + logger.exception("Project for ingested event does not exist: %s", project_id) + return + + return process_event(message, project, reprocess_only_stuck_events) + + except Exception as exc: + # If the retriable exception was raised, we should not DLQ + if isinstance(exc, Retriable): + raise + + # If no retriable exception was raised, check the schema to decide whether to DLQ + default_topic = consumer_type_to_default_topic[consumer_type].value + + # TODO: Currently, there is only a schema for ingest-events, so just continue to re-raise + # the exception if it's a different topic. This can be removed once attachments and transactions + # have schemas too. + if default_topic != "ingest-events": + raise + + codec = sentry_kafka_schemas.get_codec(default_topic) + + try: + codec.decode(raw_payload, validate=True) + except Exception: + raw_value = raw_message.value + assert isinstance(raw_value, BrokerValue) + + raise InvalidMessage(raw_value.partition, raw_value.offset) - return process_event(message, project, reprocess_only_stuck_events) + raise diff --git a/tests/relay_integration/test_sdk.py b/tests/relay_integration/test_sdk.py index e09817076f5362..b2f78ff032a97e 100644 --- a/tests/relay_integration/test_sdk.py +++ b/tests/relay_integration/test_sdk.py @@ -89,7 +89,7 @@ def test_recursion_breaker(settings, post_event_with_sdk): with mock.patch( "sentry.event_manager.EventManager.save", spec=Event, side_effect=ValueError("oh no!") ) as save: - with pytest.raises(ValueError): + with pytest.raises(Exception): post_event_with_sdk({"message": "internal client test", "event_id": event_id}) assert_mock_called_once_with_partial(save, settings.SENTRY_PROJECT, cache_key=f"e:{event_id}:1") diff --git a/tests/sentry/ingest/ingest_consumer/test_dlq.py b/tests/sentry/ingest/ingest_consumer/test_dlq.py new file mode 100644 index 00000000000000..3f098cdd238c78 --- /dev/null +++ b/tests/sentry/ingest/ingest_consumer/test_dlq.py @@ -0,0 +1,70 @@ +import time +from datetime import datetime +from unittest.mock import Mock + +import msgpack +import pytest +from arroyo.backends.kafka import KafkaPayload +from arroyo.dlq import InvalidMessage +from arroyo.types import BrokerValue, Message, Partition, Topic + +from sentry.ingest.consumer.factory import IngestStrategyFactory +from sentry.testutils.pytest.fixtures import django_db_all + + +def make_message(payload: bytes, partition: Partition, offset: int) -> Message: + return Message( + BrokerValue( + KafkaPayload(None, payload, []), + partition, + offset, + datetime.now(), + ) + ) + + +@django_db_all +def test_dlq_invalid_messages(factories) -> None: + organization = factories.create_organization() + project = factories.create_project(organization=organization) + + valid_payload = msgpack.packb( + { + "type": "event", + "project_id": project.id, + "payload": b"{}", + "start_time": int(time.time()), + "event_id": "aaa", + } + ) + + bogus_payload = b"bogus message" + + partition = Partition(Topic("ingest-events"), 0) + offset = 5 + + factory = IngestStrategyFactory( + "events", + reprocess_only_stuck_events=False, + num_processes=1, + max_batch_size=1, + max_batch_time=1, + input_block_size=None, + output_block_size=None, + ) + strategy = factory.create_with_partitions(Mock(), Mock()) + + # Valid payload raises original error + with pytest.raises(Exception) as exc_info: + message = make_message(valid_payload, partition, offset) + strategy.submit(message) + assert not isinstance(exc_info.value, InvalidMessage) + + # Invalid payload raises InvalidMessage error + + with pytest.raises(InvalidMessage) as exc_info: + message = make_message(bogus_payload, partition, offset) + strategy.submit(message) + + assert exc_info.value.partition == partition + assert exc_info.value.offset == offset