Skip to content

Commit

Permalink
Create receive messaging span when inside transaction
Browse files Browse the repository at this point in the history
This commit updates the Azure Service Bus integrations
to create messaging spans for receiving messages from
a queue or subscription when inside a traced transaction.
  • Loading branch information
russcam committed May 26, 2021
1 parent b6b679c commit e92e014
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,29 @@ private void OnReceiveStart(KeyValuePair<string, object> kv, string action)
? $"{ServiceBus.SegmentName} {action}"
: $"{ServiceBus.SegmentName} {action} from {queueName}";

var transaction = ApmAgent.Tracer.StartTransaction(transactionName, ApiConstants.TypeMessaging);
transaction.Context.Service = new Service(null, null) { Framework = _framework };
IExecutionSegment segment;
if (ApmAgent.Tracer.CurrentTransaction is null)
{
var transaction = ApmAgent.Tracer.StartTransaction(transactionName, ApiConstants.TypeMessaging);
transaction.Context.Service = new Service(null, null) { Framework = _framework };
segment = transaction;
}
else
{
var span = ApmAgent.GetCurrentExecutionSegment().StartSpan(transactionName, ApiConstants.TypeMessaging, ServiceBus.SubType, action);
segment = span;
}

// transaction creation will create an activity, so use this as the key.
var activityId = Activity.Current.Id;

if (!_processingSegments.TryAdd(activityId, transaction))
if (!_processingSegments.TryAdd(activityId, segment))
{
Logger.Error()?.Log(
"Could not add {Action} transaction {TransactionId} for activity {ActivityId} to tracked segments",
Logger.Trace()?.Log(
"Could not add {Action} {SegmentName} {TransactionId} for activity {ActivityId} to tracked segments",
action,
transaction.Id,
segment is ITransaction ? "transaction" : "span",
segment.Id,
activityId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,29 @@ private void OnReceiveStart(KeyValuePair<string, object> kv, string action, Prop
? $"{ServiceBus.SegmentName} {action}"
: $"{ServiceBus.SegmentName} {action} from {queueName}";

var transaction = ApmAgent.Tracer.StartTransaction(transactionName, ApiConstants.TypeMessaging);
transaction.Context.Service = new Service(null, null) { Framework = _framework };
IExecutionSegment segment;
if (ApmAgent.Tracer.CurrentTransaction is null)
{
var transaction = ApmAgent.Tracer.StartTransaction(transactionName, ApiConstants.TypeMessaging);
transaction.Context.Service = new Service(null, null) { Framework = _framework };
segment = transaction;
}
else
{
var span = ApmAgent.GetCurrentExecutionSegment().StartSpan(transactionName, ApiConstants.TypeMessaging, ServiceBus.SubType, action);
segment = span;
}

// transaction creation will create an activity, so use this as the key.
var activityId = Activity.Current.Id;

if (!_processingSegments.TryAdd(activityId, transaction))
if (!_processingSegments.TryAdd(activityId, segment))
{
Logger.Trace()?.Log(
"Could not add {Action} transaction {TransactionId} for activity {ActivityId} to tracked segments",
"Could not add {Action} {SegmentName} {TransactionId} for activity {ActivityId} to tracked segments",
action,
transaction.Id,
segment is ITransaction ? "transaction" : "span",
segment.Id,
activityId);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
Expand Down Expand Up @@ -156,6 +157,33 @@ await sender.ScheduleMessageAsync(
destination.Service.Type.Should().Be(ApiConstants.TypeMessaging);
}

[AzureCredentialsFact]
public async Task Capture_Span_When_Receive_From_Queue_Inside_Transaction()
{
await using var scope = await QueueScope.CreateWithQueue(_adminClient);
var sender = _client.CreateSender(scope.QueueName);
var receiver = _client.CreateReceiver(scope.QueueName);

await sender.SendMessageAsync(
new ServiceBusMessage("test message")).ConfigureAwait(false);

await _agent.Tracer.CaptureTransaction("Receive messages", ApiConstants.TypeMessaging, async t =>
{
await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false);
});


if (!_sender.WaitForSpans(TimeSpan.FromMinutes(2)))
throw new Exception("No span received in timeout");

_sender.Spans.Should().HaveCount(1);
var span = _sender.SpansOnFirstTransaction.First();

span.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.QueueName}");
span.Type.Should().Be(ApiConstants.TypeMessaging);
span.Subtype.Should().Be(ServiceBus.SubType);
}

[AzureCredentialsFact]
public async Task Capture_Transaction_When_Receive_From_Queue()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus.Administration;
Expand Down Expand Up @@ -156,6 +157,32 @@ await sender.ScheduleMessageAsync(
destination.Service.Type.Should().Be(ApiConstants.TypeMessaging);
}

[AzureCredentialsFact]
public async Task Capture_Span_When_Receive_From_Queue_Inside_Transaction()
{
await using var scope = await QueueScope.CreateWithQueue(_adminClient);
var sender = new MessageSender(_environment.ServiceBusConnectionString, scope.QueueName);
var receiver = new MessageReceiver(_environment.ServiceBusConnectionString, scope.QueueName, ReceiveMode.PeekLock);

await sender.SendAsync(
new Message(Encoding.UTF8.GetBytes("test message"))).ConfigureAwait(false);

await _agent.Tracer.CaptureTransaction("Receive messages", ApiConstants.TypeMessaging, async t =>
{
await receiver.ReceiveAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false);
});

if (!_sender.WaitForSpans(TimeSpan.FromMinutes(2)))
throw new Exception("No span received in timeout");

_sender.Spans.Should().HaveCount(1);
var span = _sender.SpansOnFirstTransaction.First();

span.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.QueueName}");
span.Type.Should().Be(ApiConstants.TypeMessaging);
span.Subtype.Should().Be(ServiceBus.SubType);
}

[AzureCredentialsFact]
public async Task Capture_Transaction_When_Receive_From_Queue()
{
Expand Down

0 comments on commit e92e014

Please sign in to comment.