Skip to content

Commit

Permalink
Create receive messaging span when inside transaction (#1308)
Browse files Browse the repository at this point in the history
This commit updates the Azure Service Bus integrations
to create messaging spans for receiving messages from
a queue or subscription when inside a traced transaction.
  • Loading branch information
russcam authored May 27, 2021
1 parent 4591f02 commit a986d18
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,29 @@ private void OnReceiveStart(KeyValuePair<string, object> kv, string action)
? $"{ServiceBus.SegmentName} {action}"
: $"{ServiceBus.SegmentName} {action} from {queueName}";

var transaction = ApmAgent.Tracer.StartTransaction(transactionName, ApiConstants.TypeMessaging);
transaction.Context.Service = new Service(null, null) { Framework = _framework };
IExecutionSegment segment;
if (ApmAgent.Tracer.CurrentTransaction is null)
{
var transaction = ApmAgent.Tracer.StartTransaction(transactionName, ApiConstants.TypeMessaging);
transaction.Context.Service = new Service(null, null) { Framework = _framework };
segment = transaction;
}
else
{
var span = ApmAgent.GetCurrentExecutionSegment().StartSpan(transactionName, ApiConstants.TypeMessaging, ServiceBus.SubType, action);
segment = span;
}

// transaction creation will create an activity, so use this as the key.
var activityId = Activity.Current.Id;

if (!_processingSegments.TryAdd(activityId, transaction))
if (!_processingSegments.TryAdd(activityId, segment))
{
Logger.Error()?.Log(
"Could not add {Action} transaction {TransactionId} for activity {ActivityId} to tracked segments",
Logger.Trace()?.Log(
"Could not add {Action} {SegmentName} {TransactionId} for activity {ActivityId} to tracked segments",
action,
transaction.Id,
segment is ITransaction ? "transaction" : "span",
segment.Id,
activityId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,29 @@ private void OnReceiveStart(KeyValuePair<string, object> kv, string action, Prop
? $"{ServiceBus.SegmentName} {action}"
: $"{ServiceBus.SegmentName} {action} from {queueName}";

var transaction = ApmAgent.Tracer.StartTransaction(transactionName, ApiConstants.TypeMessaging);
transaction.Context.Service = new Service(null, null) { Framework = _framework };
IExecutionSegment segment;
if (ApmAgent.Tracer.CurrentTransaction is null)
{
var transaction = ApmAgent.Tracer.StartTransaction(transactionName, ApiConstants.TypeMessaging);
transaction.Context.Service = new Service(null, null) { Framework = _framework };
segment = transaction;
}
else
{
var span = ApmAgent.GetCurrentExecutionSegment().StartSpan(transactionName, ApiConstants.TypeMessaging, ServiceBus.SubType, action);
segment = span;
}

// transaction creation will create an activity, so use this as the key.
var activityId = Activity.Current.Id;

if (!_processingSegments.TryAdd(activityId, transaction))
if (!_processingSegments.TryAdd(activityId, segment))
{
Logger.Trace()?.Log(
"Could not add {Action} transaction {TransactionId} for activity {ActivityId} to tracked segments",
"Could not add {Action} {SegmentName} {TransactionId} for activity {ActivityId} to tracked segments",
action,
transaction.Id,
segment is ITransaction ? "transaction" : "span",
segment.Id,
activityId);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
Expand Down Expand Up @@ -156,6 +157,33 @@ await sender.ScheduleMessageAsync(
destination.Service.Type.Should().Be(ApiConstants.TypeMessaging);
}

[AzureCredentialsFact]
public async Task Capture_Span_When_Receive_From_Queue_Inside_Transaction()
{
await using var scope = await QueueScope.CreateWithQueue(_adminClient);
var sender = _client.CreateSender(scope.QueueName);
var receiver = _client.CreateReceiver(scope.QueueName);

await sender.SendMessageAsync(
new ServiceBusMessage("test message")).ConfigureAwait(false);

await _agent.Tracer.CaptureTransaction("Receive messages", ApiConstants.TypeMessaging, async t =>
{
await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false);
});


if (!_sender.WaitForSpans(TimeSpan.FromMinutes(2)))
throw new Exception("No span received in timeout");

_sender.Spans.Should().HaveCount(1);
var span = _sender.SpansOnFirstTransaction.First();

span.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.QueueName}");
span.Type.Should().Be(ApiConstants.TypeMessaging);
span.Subtype.Should().Be(ServiceBus.SubType);
}

[AzureCredentialsFact]
public async Task Capture_Transaction_When_Receive_From_Queue()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus.Administration;
Expand Down Expand Up @@ -156,6 +157,32 @@ await sender.ScheduleMessageAsync(
destination.Service.Type.Should().Be(ApiConstants.TypeMessaging);
}

[AzureCredentialsFact]
public async Task Capture_Span_When_Receive_From_Queue_Inside_Transaction()
{
await using var scope = await QueueScope.CreateWithQueue(_adminClient);
var sender = new MessageSender(_environment.ServiceBusConnectionString, scope.QueueName);
var receiver = new MessageReceiver(_environment.ServiceBusConnectionString, scope.QueueName, ReceiveMode.PeekLock);

await sender.SendAsync(
new Message(Encoding.UTF8.GetBytes("test message"))).ConfigureAwait(false);

await _agent.Tracer.CaptureTransaction("Receive messages", ApiConstants.TypeMessaging, async t =>
{
await receiver.ReceiveAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false);
});

if (!_sender.WaitForSpans(TimeSpan.FromMinutes(2)))
throw new Exception("No span received in timeout");

_sender.Spans.Should().HaveCount(1);
var span = _sender.SpansOnFirstTransaction.First();

span.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.QueueName}");
span.Type.Should().Be(ApiConstants.TypeMessaging);
span.Subtype.Should().Be(ServiceBus.SubType);
}

[AzureCredentialsFact]
public async Task Capture_Transaction_When_Receive_From_Queue()
{
Expand Down

0 comments on commit a986d18

Please sign in to comment.