From 59ffea51e8699e9f303e88f7e00f6291aadedcf4 Mon Sep 17 00:00:00 2001 From: Maxwell Weru Date: Thu, 6 Jun 2024 19:16:32 +0300 Subject: [PATCH] Set status and exception in Activity instances --- .../AmazonSqsTransport.cs | 7 +- .../AzureEventHubsTransport.cs | 7 +- .../AzureQueueStorageTransport.cs | 7 +- .../AzureServiceBusTransport.cs | 5 ++ .../InMemoryTransport.cs | 7 +- .../KafkaTransport.cs | 7 +- .../RabbitMqTransport.cs | 5 ++ src/Tingle.EventBus/EventBus.cs | 76 +++++++++++++------ .../Extensions/ActivityExtensions.cs | 59 ++++++++++++++ 9 files changed, 153 insertions(+), 27 deletions(-) create mode 100644 src/Tingle.EventBus/Extensions/ActivityExtensions.cs diff --git a/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs b/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs index f7a95fe6..b0ffe80c 100644 --- a/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs +++ b/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs @@ -425,7 +425,12 @@ private async Task OnMessageReceivedAsync(IServiceScope scope, EventRegistration Logger.ReceivedMessage(messageId: messageId, eventBusId: context.Id, queueUrl: queueUrl); - var (successful, _) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false); + var (successful, ex) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false); + if (ex != null) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.AddException(ex); + } // dead-letter cannot be dead-lettered again, what else can we do? if (ecr.Deadletter) return; // TODO: figure out what to do when dead-letter fails diff --git a/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs b/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs index 8b675b8f..4350bce8 100644 --- a/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs +++ b/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs @@ -406,7 +406,12 @@ private async Task OnEventReceivedAsync(EventRegistration reg, EventConsumerRegi .SetPartitionContext(args.Partition) .SetEventData(data); - var (successful, _) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false); + var (successful, ex) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false); + if (ex != null) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.AddException(ex); + } // dead-letter cannot be dead-lettered again, what else can we do? if (ecr.Deadletter) return; // TODO: figure out what to do when dead-letter fails diff --git a/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs b/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs index d9df6110..8f636114 100644 --- a/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs +++ b/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs @@ -321,7 +321,12 @@ private async Task OnMessageReceivedAsync(IServiceScope scope, EventRegistration activity?.SetParentId(parentId: parentActivityId); } - var (successful, _) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false); + var (successful, ex) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false); + if (ex != null) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.AddException(ex); + } // dead-letter cannot be dead-lettered again, what else can we do? if (ecr.Deadletter) return; // TODO: figure out what to do when dead-letter fails diff --git a/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs b/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs index 6c3af524..94cb137c 100644 --- a/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs +++ b/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs @@ -549,6 +549,11 @@ private async Task OnMessageReceivedAsync(EventRegistration reg, EventConsumerRe } var (successful, ex) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false); + if (ex != null) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.AddException(ex); + } // Decide the action to execute then execute var action = DecideAction(successful, ecr.UnhandledErrorBehaviour, processor.AutoCompleteMessages); diff --git a/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs b/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs index 38931020..3f0435f4 100644 --- a/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs +++ b/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs @@ -352,7 +352,12 @@ private async Task OnMessageReceivedAsync(EventRegistration reg, EventConsumerRe // set the extras context.SetInMemoryReceivedMessage(message); - var (successful, _) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false); + var (successful, ex) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false); + if (ex != null) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.AddException(ex); + } // Add to Consumed/Failed list if (successful) consumed.Add(context); diff --git a/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs b/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs index cc1c6dad..c3aacb28 100644 --- a/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs +++ b/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs @@ -309,7 +309,12 @@ private async Task OnEventReceivedAsync(EventRegistration reg, EventConsumerRegi partition: result.Partition, offset: result.Offset); - var (successful, _) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false); + var (successful, ex) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false); + if (ex != null) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.AddException(ex); + } // dead-letter cannot be dead-lettered again, what else can we do? if (ecr.Deadletter) return; // TODO: figure out what to do when dead-letter fails diff --git a/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs b/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs index 2cbeef73..e03eeb41 100644 --- a/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs +++ b/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs @@ -310,6 +310,11 @@ private async Task OnMessageReceivedAsync(EventRegistration reg, EventConsumerRe messageId, context.Id); var (successful, ex) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false); + if (ex != null) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.AddException(ex); + } // Decide the action to execute then execute var action = DecideAction(successful, ecr.UnhandledErrorBehaviour); diff --git a/src/Tingle.EventBus/EventBus.cs b/src/Tingle.EventBus/EventBus.cs index 03bcc28b..d8573efc 100644 --- a/src/Tingle.EventBus/EventBus.cs +++ b/src/Tingle.EventBus/EventBus.cs @@ -30,9 +30,7 @@ public class EventBus(EventBusTransportProvider transportProvider, private readonly IReadOnlyDictionary transports = transportProvider.GetTransports(); - /// - /// Publish an event. - /// + /// Publish an event. /// The event type. /// The event to publish. /// @@ -74,15 +72,23 @@ public class EventBus(EventBusTransportProvider transportProvider, activity?.AddTag(ActivityTagNames.MessagingConversationId, @event.CorrelationId); // Publish on the transport - return await transport.PublishAsync(@event: @event, - registration: reg, - scheduled: scheduled, - cancellationToken: cancellationToken).ConfigureAwait(false); + try + { + return await transport.PublishAsync(@event: @event, + registration: reg, + scheduled: scheduled, + cancellationToken: cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.AddException(ex); + + throw; + } } - /// - /// Publish a batch of events. - /// + /// Publish a batch of events. /// The event type. /// The events to publish. /// @@ -127,16 +133,24 @@ public class EventBus(EventBusTransportProvider transportProvider, activity?.AddTag(ActivityTagNames.MessagingConversationId, string.Join(",", events.Select(e => e.CorrelationId))); // Publish on the transport - return await transport.PublishAsync(events: events, - registration: reg, - scheduled: scheduled, - cancellationToken: cancellationToken).ConfigureAwait(false); + try + { + return await transport.PublishAsync(events: events, + registration: reg, + scheduled: scheduled, + cancellationToken: cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.AddException(ex); + + throw; + } } - /// - /// Cancel a scheduled event. - /// + /// Cancel a scheduled event. /// The event type. /// The scheduling identifier of the scheduled event. /// @@ -155,12 +169,20 @@ public class EventBus(EventBusTransportProvider transportProvider, activity?.AddTag(ActivityTagNames.MessagingSystem, transport.Name); // Cancel on the transport - await transport.CancelAsync(id: id, registration: reg, cancellationToken: cancellationToken).ConfigureAwait(false); + try + { + await transport.CancelAsync(id: id, registration: reg, cancellationToken: cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.AddException(ex); + + throw; + } } - /// - /// Cancel a batch of scheduled events. - /// + /// Cancel a batch of scheduled events. /// The event type. /// The scheduling identifiers of the scheduled events. /// @@ -179,7 +201,17 @@ public class EventBus(EventBusTransportProvider transportProvider, activity?.AddTag(ActivityTagNames.MessagingSystem, transport.Name); // Cancel on the transport - await transport.CancelAsync(ids: ids, registration: reg, cancellationToken: cancellationToken).ConfigureAwait(false); + try + { + await transport.CancelAsync(ids: ids, registration: reg, cancellationToken: cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.AddException(ex); + + throw; + } } /// diff --git a/src/Tingle.EventBus/Extensions/ActivityExtensions.cs b/src/Tingle.EventBus/Extensions/ActivityExtensions.cs new file mode 100644 index 00000000..20357a27 --- /dev/null +++ b/src/Tingle.EventBus/Extensions/ActivityExtensions.cs @@ -0,0 +1,59 @@ +namespace System.Diagnostics; + +/// +public static class ActivityExtensions +{ + // Copied from https://github.com/dotnet/runtime/pull/102905/files +#if !NET9_0_OR_GREATER + /// + public static Activity AddException(this Activity activity, Exception exception, TagList tags = default, DateTimeOffset timestamp = default) + { + if (activity is null) throw new ArgumentNullException(nameof(activity)); + if (exception is null) throw new ArgumentNullException(nameof(exception)); + + TagList exceptionTags = tags; + + const string ExceptionEventName = "exception"; + const string ExceptionMessageTag = "exception.message"; + const string ExceptionStackTraceTag = "exception.stacktrace"; + const string ExceptionTypeTag = "exception.type"; + + bool hasMessage = false; + bool hasStackTrace = false; + bool hasType = false; + + for (int i = 0; i < exceptionTags.Count; i++) + { + if (exceptionTags[i].Key == ExceptionMessageTag) + { + hasMessage = true; + } + else if (exceptionTags[i].Key == ExceptionStackTraceTag) + { + hasStackTrace = true; + } + else if (exceptionTags[i].Key == ExceptionTypeTag) + { + hasType = true; + } + } + + if (!hasMessage) + { + exceptionTags.Add(new KeyValuePair(ExceptionMessageTag, exception.Message)); + } + + if (!hasStackTrace) + { + exceptionTags.Add(new KeyValuePair(ExceptionStackTraceTag, exception.ToString())); + } + + if (!hasType) + { + exceptionTags.Add(new KeyValuePair(ExceptionTypeTag, exception.GetType().ToString())); + } + + return activity.AddEvent(new ActivityEvent(ExceptionEventName, timestamp)); + } +#endif +}