Skip to content

Commit

Permalink
feat: AWS-SDK SNS Context propagation (#728)
Browse files Browse the repository at this point in the history
  • Loading branch information
habmic authored Nov 12, 2021
1 parent 7a7d3a3 commit 78cd4e1
Show file tree
Hide file tree
Showing 8 changed files with 363 additions and 56 deletions.
5 changes: 1 addition & 4 deletions plugins/node/opentelemetry-instrumentation-aws-sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,9 @@ AWS contains dozens of services accessible with the JS SDK. For many services, t
Specific service logic currently implemented for:

- [SQS](./docs/sqs.md)
- [SNS](./docs/sns.md)
- DynamoDb

---

This instrumentation is a work in progress. We implemented some of the specific trace semantics for some of the services, and strive to support more services and extend the already supported services in the future. You can [Open an Issue](https://github.com/aspecto-io/opentelemetry-ext-js/issues), or [Submit a Pull Request](https://github.com/aspecto-io/opentelemetry-ext-js/pulls) if you want to contribute.

## Potential Side Effects

The instrumentation is doing best effort to support the trace specification of OpenTelemetry. For SQS, it involves defining new attributes on the `Messages` array, as well as on the manipulated types generated from this array (to set correct trace context for a single SQS message operation). Those properties are defined as [non-enumerable](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Enumerability_and_ownership_of_properties) properties, so they have minimum side effect on the app. They will, however, show when using the `Object.getOwnPropertyDescriptors` and `Reflect.ownKeys` functions on SQS `Messages` array and for each `Message` in the array.
Expand Down
15 changes: 15 additions & 0 deletions plugins/node/opentelemetry-instrumentation-aws-sdk/doc/sns.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# SNS

SNS is amazon's managed pub/sub system. Thus, it should follow the [OpenTelemetry specification for Messaging systems](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md).

## Specific trace semantic

The following methods are automatically enhanced:

### Publish messages

- [Messaging Attributes](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md#messaging-attributes) are added by this instrumentation according to the spec.
- OpenTelemetry trace context is injected as SNS MessageAttributes, so the service receiving the message can link cascading spans to the trace which created the message.

### Consumers
There are many potential consumers: SQS, Lambda, HTTP/S, Email, SMS, mobile notifications. each one of them will received the propagated context in its own way.
14 changes: 8 additions & 6 deletions plugins/node/opentelemetry-instrumentation-aws-sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,21 +60,23 @@
"@opentelemetry/sdk-trace-base": "1.0.0",
"@types/mocha": "^8.2.2",
"@types/node": "^14.0.0",
"@types/sinon": "^10.0.6",
"aws-sdk": "2.1008.0",
"eslint": "^7.32.0",
"expect": "^25",
"mocha": "7.2.0",
"ts-mocha": "8.0.0",
"nock": "^13.0.11",
"nyc": "^15.1.0",
"rimraf": "^3.0.2",
"sinon": "^12.0.0",
"gts": "3.1.0",
"@opentelemetry/contrib-test-utils": "^0.27.0",
"test-all-versions": "^5.0.1",
"ts-mocha": "8.0.0",
"ts-node": "^9.1.1",
"typescript": "4.3.4",
"eslint": "^7.32.0",
"nyc": "^15.1.0",
"rimraf": "^3.0.2"
"typescript": "4.3.4"
},
"engines": {
"node": ">=8.5.0"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import {
TextMapGetter,
TextMapSetter,
context,
propagation,
diag,
} from '@opentelemetry/api';
import type { SQS, SNS } from 'aws-sdk';

// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-quotas.html
export const MAX_MESSAGE_ATTRIBUTES = 10;
class ContextSetter
implements
TextMapSetter<SQS.MessageBodyAttributeMap | SNS.MessageAttributeMap>
{
set(
carrier: SQS.MessageBodyAttributeMap | SNS.MessageAttributeMap,
key: string,
value: string
) {
carrier[key] = {
DataType: 'String',
StringValue: value as string,
};
}
}
export const contextSetter = new ContextSetter();

class ContextGetter
implements
TextMapGetter<SQS.MessageBodyAttributeMap | SNS.MessageAttributeMap>
{
keys(
carrier: SQS.MessageBodyAttributeMap | SNS.MessageAttributeMap
): string[] {
return Object.keys(carrier);
}

get(
carrier: SQS.MessageBodyAttributeMap | SNS.MessageAttributeMap,
key: string
): undefined | string | string[] {
return carrier?.[key]?.StringValue;
}
}
export const contextGetter = new ContextGetter();

export const injectPropagationContext = (
attributesMap?: SQS.MessageBodyAttributeMap | SNS.MessageAttributeMap
): SQS.MessageBodyAttributeMap | SNS.MessageAttributeMap => {
const attributes = attributesMap ?? {};
if (Object.keys(attributes).length < MAX_MESSAGE_ATTRIBUTES) {
propagation.inject(context.active(), attributes, contextSetter);
} else {
diag.warn(
'aws-sdk instrumentation: cannot set context propagation on SQS/SNS message due to maximum amount of MessageAttributes'
);
}
return attributes;
};
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import {
NormalizedResponse,
} from '../types';
import { DynamodbServiceExtension } from './dynamodb';
import { SnsServiceExtension } from './sns';

export class ServicesExtensions implements ServiceExtension {
services: Map<string, ServiceExtension> = new Map();

constructor() {
this.services.set('SQS', new SqsServiceExtension());
this.services.set('SNS', new SnsServiceExtension());
this.services.set('DynamoDB', new DynamodbServiceExtension());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Span, Tracer, SpanKind } from '@opentelemetry/api';
import {
MessagingDestinationKindValues,
SemanticAttributes,
} from '@opentelemetry/semantic-conventions';
import {
NormalizedRequest,
NormalizedResponse,
AwsSdkInstrumentationConfig,
} from '../types';
import { injectPropagationContext } from './MessageAttributes';
import { RequestMetadata, ServiceExtension } from './ServiceExtension';

export class SnsServiceExtension implements ServiceExtension {
requestPreSpanHook(request: NormalizedRequest): RequestMetadata {
let spanKind: SpanKind = SpanKind.CLIENT;
let spanName = `SNS ${request.commandName}`;
const spanAttributes = {
[SemanticAttributes.MESSAGING_SYSTEM]: 'aws.sns',
};

if (request.commandName === 'Publish') {
spanKind = SpanKind.PRODUCER;

spanAttributes[SemanticAttributes.MESSAGING_DESTINATION_KIND] =
MessagingDestinationKindValues.TOPIC;
const { TopicArn, TargetArn, PhoneNumber } = request.commandInput;
spanAttributes[SemanticAttributes.MESSAGING_DESTINATION] =
this.extractDestinationName(TopicArn, TargetArn, PhoneNumber);

spanName = `${spanAttributes[SemanticAttributes.MESSAGING_DESTINATION]} ${
request.commandName
}`;
}

return {
isIncoming: false,
spanAttributes,
spanKind,
spanName,
};
}

requestPostSpanHook(request: NormalizedRequest): void {
if (request.commandName === 'Publish') {
const origMessageAttributes =
request.commandInput['MessageAttributes'] ?? {};
if (origMessageAttributes) {
request.commandInput['MessageAttributes'] = injectPropagationContext(
origMessageAttributes
);
}
}
}

responseHook(
response: NormalizedResponse,
span: Span,
tracer: Tracer,
config: AwsSdkInstrumentationConfig
): void {}

extractDestinationName(
topicArn: string,
targetArn: string,
phoneNumber: string
): string {
if (topicArn || targetArn) {
const arn = topicArn ?? targetArn;
try {
return arn.substr(arn.lastIndexOf(':') + 1);
} catch (err) {
return arn;
}
} else if (phoneNumber) {
return phoneNumber;
} else {
return 'unknown';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ import {
SpanKind,
Span,
propagation,
diag,
TextMapGetter,
TextMapSetter,
trace,
context,
ROOT_CONTEXT,
Expand All @@ -37,32 +34,7 @@ import {
MessagingDestinationKindValues,
SemanticAttributes,
} from '@opentelemetry/semantic-conventions';

// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-quotas.html
const SQS_MAX_MESSAGE_ATTRIBUTES = 10;
class SqsContextSetter implements TextMapSetter<SQS.MessageBodyAttributeMap> {
set(carrier: SQS.MessageBodyAttributeMap, key: string, value: string) {
carrier[key] = {
DataType: 'String',
StringValue: value as string,
};
}
}
const sqsContextSetter = new SqsContextSetter();

class SqsContextGetter implements TextMapGetter<SQS.MessageBodyAttributeMap> {
keys(carrier: SQS.MessageBodyAttributeMap): string[] {
return Object.keys(carrier);
}

get(
carrier: SQS.MessageBodyAttributeMap,
key: string
): undefined | string | string[] {
return carrier?.[key]?.StringValue;
}
}
const sqsContextGetter = new SqsContextGetter();
import { contextGetter, injectPropagationContext } from './MessageAttributes';

export class SqsServiceExtension implements ServiceExtension {
requestPreSpanHook(request: NormalizedRequest): RequestMetadata {
Expand Down Expand Up @@ -118,7 +90,7 @@ export class SqsServiceExtension implements ServiceExtension {
request.commandInput['MessageAttributes'] ?? {};
if (origMessageAttributes) {
request.commandInput['MessageAttributes'] =
this.InjectPropagationContext(origMessageAttributes);
injectPropagationContext(origMessageAttributes);
}
}
break;
Expand All @@ -127,7 +99,7 @@ export class SqsServiceExtension implements ServiceExtension {
{
request.commandInput?.Entries?.forEach(
(messageParams: SQS.SendMessageBatchRequestEntry) => {
messageParams.MessageAttributes = this.InjectPropagationContext(
messageParams.MessageAttributes = injectPropagationContext(
messageParams.MessageAttributes ?? {}
);
}
Expand Down Expand Up @@ -157,7 +129,7 @@ export class SqsServiceExtension implements ServiceExtension {
parentContext: propagation.extract(
ROOT_CONTEXT,
message.MessageAttributes,
sqsContextGetter
contextGetter
),
attributes: {
[SemanticAttributes.MESSAGING_SYSTEM]: 'aws.sqs',
Expand Down Expand Up @@ -193,18 +165,4 @@ export class SqsServiceExtension implements ServiceExtension {

return segments[segments.length - 1];
};

InjectPropagationContext(
attributesMap?: SQS.MessageBodyAttributeMap
): SQS.MessageBodyAttributeMap {
const attributes = attributesMap ?? {};
if (Object.keys(attributes).length < SQS_MAX_MESSAGE_ATTRIBUTES) {
propagation.inject(context.active(), attributes, sqsContextSetter);
} else {
diag.warn(
'aws-sdk instrumentation: cannot set context propagation on SQS message due to maximum amount of MessageAttributes'
);
}
return attributes;
}
}
Loading

0 comments on commit 78cd4e1

Please sign in to comment.