diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/sqs.ts b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/sqs.ts index 633cfd49cb..6d3ca17f76 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/sqs.ts +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/sqs.ts @@ -121,45 +121,61 @@ export class SqsServiceExtension implements ServiceExtension { tracer: Tracer, config: AwsSdkInstrumentationConfig ) => { - const messages: SQS.Message[] = response?.data?.Messages; - if (messages) { - const queueUrl = this.extractQueueUrl(response.request.commandInput); - const queueName = this.extractQueueNameFromUrl(queueUrl); - - pubsubPropagation.patchMessagesArrayToStartProcessSpans({ - messages, - parentContext: trace.setSpan(context.active(), span), - tracer, - messageToSpanDetails: (message: SQS.Message) => ({ - name: queueName ?? 'unknown', - parentContext: propagation.extract( - ROOT_CONTEXT, - extractPropagationContext( - message, - config.sqsExtractContextPropagationFromPayload - ), - contextGetter - ), - attributes: { - [SemanticAttributes.MESSAGING_SYSTEM]: 'aws.sqs', - [SemanticAttributes.MESSAGING_DESTINATION]: queueName, - [SemanticAttributes.MESSAGING_DESTINATION_KIND]: - MessagingDestinationKindValues.QUEUE, - [SemanticAttributes.MESSAGING_MESSAGE_ID]: message.MessageId, - [SemanticAttributes.MESSAGING_URL]: queueUrl, - [SemanticAttributes.MESSAGING_OPERATION]: - MessagingOperationValues.PROCESS, - }, - }), - processHook: (span: Span, message: SQS.Message) => - config.sqsProcessHook?.(span, { message }), - }); - - pubsubPropagation.patchArrayForProcessSpans( - messages, - tracer, - context.active() - ); + switch (response.request.commandName) { + case 'SendMessage': + span.setAttribute( + SemanticAttributes.MESSAGING_MESSAGE_ID, + response?.data?.MessageId + ); + break; + + case 'SendMessageBatch': + // TODO: How should this be handled? + break; + + case 'ReceiveMessage': { + const messages: SQS.Message[] = response?.data?.Messages; + if (messages) { + const queueUrl = this.extractQueueUrl(response.request.commandInput); + const queueName = this.extractQueueNameFromUrl(queueUrl); + + pubsubPropagation.patchMessagesArrayToStartProcessSpans({ + messages, + parentContext: trace.setSpan(context.active(), span), + tracer, + messageToSpanDetails: (message: SQS.Message) => ({ + name: queueName ?? 'unknown', + parentContext: propagation.extract( + ROOT_CONTEXT, + extractPropagationContext( + message, + config.sqsExtractContextPropagationFromPayload + ), + contextGetter + ), + attributes: { + [SemanticAttributes.MESSAGING_SYSTEM]: 'aws.sqs', + [SemanticAttributes.MESSAGING_DESTINATION]: queueName, + [SemanticAttributes.MESSAGING_DESTINATION_KIND]: + MessagingDestinationKindValues.QUEUE, + [SemanticAttributes.MESSAGING_MESSAGE_ID]: message.MessageId, + [SemanticAttributes.MESSAGING_URL]: queueUrl, + [SemanticAttributes.MESSAGING_OPERATION]: + MessagingOperationValues.PROCESS, + }, + }), + processHook: (span: Span, message: SQS.Message) => + config.sqsProcessHook?.(span, { message }), + }); + + pubsubPropagation.patchArrayForProcessSpans( + messages, + tracer, + context.active() + ); + } + break; + } } }; diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/test/aws-sdk-v3.test.ts b/plugins/node/opentelemetry-instrumentation-aws-sdk/test/aws-sdk-v3.test.ts index 84a14c6c78..cefd73984f 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-sdk/test/aws-sdk-v3.test.ts +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/test/aws-sdk-v3.test.ts @@ -293,7 +293,7 @@ describe('instrumentation-aws-sdk-v3', () => { 'https://sqs.us-east-1.amazonaws.com/731241200085/otel-demo-aws-sdk', MessageBody: 'payload example from v3 without batch', }; - await sqsClient.sendMessage(params); + const response = await sqsClient.sendMessage(params); expect(getTestSpans().length).toBe(1); const [span] = getTestSpans(); @@ -320,6 +320,9 @@ describe('instrumentation-aws-sdk-v3', () => { expect(span.attributes[SemanticAttributes.MESSAGING_URL]).toEqual( params.QueueUrl ); + expect( + span.attributes[SemanticAttributes.MESSAGING_MESSAGE_ID] + ).toEqual(response.MessageId); expect(span.attributes[SemanticAttributes.HTTP_STATUS_CODE]).toEqual( 200 ); diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/test/sqs.test.ts b/plugins/node/opentelemetry-instrumentation-aws-sdk/test/sqs.test.ts index a23f3514b3..901b4fca1d 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-sdk/test/sqs.test.ts +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/test/sqs.test.ts @@ -25,6 +25,7 @@ import * as AWS from 'aws-sdk'; import { AWSError } from 'aws-sdk'; import { + MessagingDestinationKindValues, MessagingOperationValues, SemanticAttributes, } from '@opentelemetry/semantic-conventions'; @@ -41,11 +42,13 @@ import { Message } from 'aws-sdk/clients/sqs'; import * as expect from 'expect'; import * as sinon from 'sinon'; import * as messageAttributes from '../src/services/MessageAttributes'; +import { AttributeNames } from '../src/enums'; const responseMockSuccess = { requestId: '0000000000000', error: null, -}; + httpResponse: { statusCode: 200 }, +} as AWS.Response; const extractContextSpy = sinon.spy( messageAttributes, @@ -361,6 +364,49 @@ describe('SQS', () => { }); describe('hooks', () => { + it('sqsResponseHook for sendMessage should add messaging attributes', async () => { + const region = 'us-east-1'; + const sqs = new AWS.SQS(); + sqs.config.update({ region }); + + const QueueName = 'unittest'; + const params = { + QueueUrl: `queue/url/for/${QueueName}`, + MessageBody: 'payload example from v2 without batch', + }; + + const response = await sqs.sendMessage(params).promise(); + + expect(getTestSpans().length).toBe(1); + const [span] = getTestSpans(); + + // make sure we have the general aws attributes: + expect(span.attributes[SemanticAttributes.RPC_SYSTEM]).toEqual('aws-api'); + expect(span.attributes[SemanticAttributes.RPC_METHOD]).toEqual( + 'SendMessage' + ); + expect(span.attributes[SemanticAttributes.RPC_SERVICE]).toEqual('SQS'); + expect(span.attributes[AttributeNames.AWS_REGION]).toEqual(region); + + // custom messaging attributes + expect(span.attributes[SemanticAttributes.MESSAGING_SYSTEM]).toEqual( + 'aws.sqs' + ); + expect( + span.attributes[SemanticAttributes.MESSAGING_DESTINATION_KIND] + ).toEqual(MessagingDestinationKindValues.QUEUE); + expect(span.attributes[SemanticAttributes.MESSAGING_DESTINATION]).toEqual( + QueueName + ); + expect(span.attributes[SemanticAttributes.MESSAGING_URL]).toEqual( + params.QueueUrl + ); + expect(span.attributes[SemanticAttributes.MESSAGING_MESSAGE_ID]).toEqual( + response.MessageId + ); + expect(span.attributes[SemanticAttributes.HTTP_STATUS_CODE]).toEqual(200); + }); + it('sqsProcessHook called and add message attribute to span', async () => { const config = { sqsProcessHook: (