diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index 1fbe6ff72e..0b7fc8117c 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -388,6 +388,7 @@ class OP: LANGCHAIN_AGENT = "ai.agent.langchain" LANGCHAIN_CHAT_COMPLETIONS_CREATE = "ai.chat_completions.create.langchain" QUEUE_PROCESS = "queue.process" + QUEUE_PUBLISH = "queue.publish" QUEUE_SUBMIT_ARQ = "queue.submit.arq" QUEUE_TASK_ARQ = "queue.task.arq" QUEUE_SUBMIT_CELERY = "queue.submit.celery" diff --git a/sentry_sdk/integrations/celery/__init__.py b/sentry_sdk/integrations/celery/__init__.py index 521d37dc86..1b2f6a5702 100644 --- a/sentry_sdk/integrations/celery/__init__.py +++ b/sentry_sdk/integrations/celery/__init__.py @@ -1,4 +1,5 @@ import sys +from collections.abc import Mapping from functools import wraps import sentry_sdk @@ -47,6 +48,7 @@ Retry, SoftTimeLimitExceeded, ) + from kombu import Producer except ImportError: raise DidNotEnable("Celery not installed") @@ -82,6 +84,7 @@ def setup_once(): _patch_build_tracer() _patch_task_apply_async() _patch_worker_exit() + _patch_producer_publish() # This logger logs every status of every task that ran on the worker. # Meaning that every task's breadcrumbs are full of stuff like "Task @@ -433,3 +436,44 @@ def sentry_workloop(*args, **kwargs): sentry_sdk.flush() Worker.workloop = sentry_workloop + + +def _patch_producer_publish(): + # type: () -> None + original_publish = Producer.publish + + @ensure_integration_enabled(CeleryIntegration, original_publish) + def sentry_publish(self, *args, **kwargs): + # type: (Producer, *Any, **Any) -> Any + kwargs_headers = kwargs.get("headers", {}) + if not isinstance(kwargs_headers, Mapping): + # Ensure kwargs_headers is a Mapping, so we can safely call get() + kwargs_headers = {} + + task_name = kwargs_headers.get("task") + task_id = kwargs_headers.get("id") + retries = kwargs_headers.get("retries") + + routing_key = kwargs.get("routing_key") + exchange = kwargs.get("exchange") + + with sentry_sdk.start_span(op=OP.QUEUE_PUBLISH, description=task_name) as span: + if task_id is not None: + span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task_id) + + if exchange == "" and routing_key is not None: + # Empty exchange indicates the default exchange, meaning messages are + # routed to the queue with the same name as the routing key. + span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key) + + if kwargs_headers.get("retries") is not None: + span.set_data(SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, retries) + + with capture_internal_exceptions(): + span.set_data( + SPANDATA.MESSAGING_SYSTEM, self.connection.transport.driver_type + ) + + return original_publish(self, *args, **kwargs) + + Producer.publish = sentry_publish diff --git a/tests/integrations/celery/test_celery.py b/tests/integrations/celery/test_celery.py index 4f71d84809..2d94f03df4 100644 --- a/tests/integrations/celery/test_celery.py +++ b/tests/integrations/celery/test_celery.py @@ -74,6 +74,19 @@ def inner(propagate_traces=True, backend="always_eager", **kwargs): return inner +@pytest.fixture +def init_celery_producer_only(): + """ + Initialize Celery integration without the worker. We mock out the actual + publishing of messages to the broker, so tasks are not actually executed. + This allows testing without `always_eager`. + + Unline `init_celery`, this fixture does not initialize sentry_sdk. + """ + with mock.patch("kombu.messaging.Producer.publish"): + yield Celery + + @pytest.fixture def celery(init_celery): return init_celery() @@ -722,3 +735,27 @@ def task(): ... (event,) = events (span,) = event["spans"] assert span["data"]["messaging.system"] == system + + +@pytest.mark.forked +@pytest.mark.parametrize("system", ("amqp", "redis")) +def test_producer_span_data( + system, sentry_init, init_celery_producer_only, capture_events +): + sentry_init(integrations=[CeleryIntegration()], enable_tracing=True) + celery = init_celery_producer_only(__name__, broker=f"{system}://example.com") + events = capture_events() + + @celery.task() + def task(): ... + + with start_transaction(): + task.apply_async() + + (event,) = events + span = next(span for span in event["spans"] if span["op"] == "queue.publish") + assert span["data"]["messaging.system"] == system + + assert span["data"]["messaging.destination.name"] == "celery" + assert "messaging.message.id" in span["data"] + assert span["data"]["messaging.message.retry.count"] == 0