From 76db007f270a646e8570768fa827ea2a97b62cbc Mon Sep 17 00:00:00 2001 From: Seth Maxwell Date: Fri, 14 Aug 2020 14:00:18 -0400 Subject: [PATCH] feat: Opentelemetry integration (#1078) * Add opentelemetry tracing * build: rename _toc to toc (#1066) * changes without context autosynth cannot find the source of changes triggered by earlier changes in this repository, or by version upgrades to tools such as linters. * fix: rename _toc to toc Source-Author: F. Hinkelmann Source-Date: Tue Jul 21 10:53:20 2020 -0400 Source-Repo: googleapis/synthtool Source-Sha: 99c93fe09f8c1dca09dfc0301c8668e3a70dd796 Source-Link: https://github.com/googleapis/synthtool/commit/99c93fe09f8c1dca09dfc0301c8668e3a70dd796 Co-authored-by: sofisl <55454395+sofisl@users.noreply.github.com> * build: move gitattributes files to node templates (#1070) Source-Author: F. Hinkelmann Source-Date: Thu Jul 23 01:45:04 2020 -0400 Source-Repo: googleapis/synthtool Source-Sha: 3a00b7fea8c4c83eaff8eb207f530a2e3e8e1de3 Source-Link: https://github.com/googleapis/synthtool/commit/3a00b7fea8c4c83eaff8eb207f530a2e3e8e1de3 * Add opentelemetry instrumentation * Add create span test * Refactor tracing * Add publisher key test * Fix linting issues * Add docs * Add example for opentelemetry * Add tracing example * Update headers * Add microsoft api documenter * Fix linting in samples/package.json * Add optional tracing * Fix linting issues * Re-add api-documenter * Update package.json * Update package.json * Update package.json * Fix docs * Add more unit tests * Fix linting * Add disable tracing tests * Update opentelemetryTracing sample Co-authored-by: Yoshi Automation Bot Co-authored-by: sofisl <55454395+sofisl@users.noreply.github.com> Co-authored-by: Benjamin E. Coe Co-authored-by: Megan Potter <57276408+feywind@users.noreply.github.com> --- package.json | 8 ++- samples/opentelemetryTracing.js | 108 ++++++++++++++++++++++++++++++++ samples/package.json | 2 + src/opentelemetry-tracing.ts | 43 +++++++++++++ src/publisher/index.ts | 69 ++++++++++++++++++-- src/subscriber.ts | 44 ++++++++++++- test/opentelemetry-tracing.ts | 63 +++++++++++++++++++ test/publisher/index.ts | 80 ++++++++++++++++++++++- test/subscriber.ts | 103 ++++++++++++++++++++++++++++++ 9 files changed, 509 insertions(+), 11 deletions(-) create mode 100644 samples/opentelemetryTracing.js create mode 100644 src/opentelemetry-tracing.ts create mode 100644 test/opentelemetry-tracing.ts diff --git a/package.json b/package.json index 83cbebc75..bdb076f47 100644 --- a/package.json +++ b/package.json @@ -30,7 +30,7 @@ "presystem-test": "npm run compile", "system-test": "mocha build/system-test --timeout 600000", "samples-test": "cd samples/ && npm link ../ && npm install && npm test && cd ../", - "test": "c8 mocha build/test", + "test": "c8 mocha build/test --recursive", "lint": "gts check", "predocs": "npm run compile", "docs": "jsdoc -c .jsdoc.js", @@ -44,15 +44,17 @@ "predocs-test": "npm run docs", "benchwrapper": "node bin/benchwrapper.js", "prelint": "cd samples; npm link ../; npm install", - "precompile": "gts clean", "api-extractor": "api-extractor run --local", - "api-documenter": "api-documenter yaml --input-folder=temp" + "api-documenter": "api-documenter yaml --input-folder=temp", + "precompile": "gts clean" }, "dependencies": { "@google-cloud/paginator": "^3.0.0", "@google-cloud/precise-date": "^2.0.0", "@google-cloud/projectify": "^2.0.0", "@google-cloud/promisify": "^2.0.0", + "@opentelemetry/api": "^0.9.0", + "@opentelemetry/tracing": "^0.9.0", "@types/duplexify": "^3.6.0", "@types/long": "^4.0.0", "arrify": "^2.0.0", diff --git a/samples/opentelemetryTracing.js b/samples/opentelemetryTracing.js new file mode 100644 index 000000000..b4f9e37e9 --- /dev/null +++ b/samples/opentelemetryTracing.js @@ -0,0 +1,108 @@ +/*! + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This sample demonstrates how to add OpenTelemetry tracing to the + * Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +'use strict'; + +// sample-metadata: +// title: OpenTelemetry Tracing +// description: Demonstrates how to enable OpenTelemetry tracing in +// a publisher or subscriber. +// usage: node opentelemetryTracing.js + +const SUBSCRIBER_TIMEOUT = 10; + +function main( + topicName = 'YOUR_TOPIC_NAME', + subscriptionName = 'YOUR_SUBSCRIPTION_NAME', + data = {foo: 'bar'} +) { + // [START opentelemetry_tracing] + /** + * TODO(developer): Uncomment these variables before running the sample. + */ + // const topicName = 'my-topic'; + // const subscriptionName = 'my-subscription'; + // const data = 'Hello, world!"; + + // Imports the Google Cloud client library + const {PubSub} = require('@google-cloud/pubsub'); + + // Imports the OpenTelemetry API + const {opentelemetry} = require('@opentelemetry/api'); + + // Imports the OpenTelemetry span handlers and exporter + const { + SimpleSpanProcessor, + BasicTracerProvider, + ConsoleSpanExporter, + } = require('@opentelemetry/tracing'); + + // Set up span processing and specify the console as the span exporter + const provider = new BasicTracerProvider(); + const exporter = new ConsoleSpanExporter(); + provider.addSpanProcessor(new SimpleSpanProcessor(exporter)); + + provider.register(); + opentelemetry.trace.setGlobalTracerProvider(provider); + + // OpenTelemetry tracing is an optional feature and can be enabled by setting + // enableOpenTelemetryTraceing as a publisher or subscriber option + const enableOpenTelemetryTracing = { + enableOpenTelemetryTracing: true, + }; + + // Creates a client; cache this for further use + const pubSubClient = new PubSub(); + + async function publishMessage() { + // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject) + const dataBuffer = Buffer.from(data); + + const messageId = await pubSubClient + .topic(topicName, enableOpenTelemetryTracing) + .publish(dataBuffer); + console.log(`Message ${messageId} published.`); + } + + async function subscriptionListen() { + // Message handler for subscriber + const messageHandler = message => { + console.log(`Message ${message.id} received.`); + message.ack(); + }; + + // Listens for new messages from the topic + pubSubClient.subscription(subscriptionName).on('message', messageHandler); + setTimeout(() => { + pubSubClient + .subscription(subscriptionName, enableOpenTelemetryTracing) + .removeAllListeners(); + }, SUBSCRIBER_TIMEOUT * 1000); + } + + publishMessage().then(subscriptionListen()); + // [END opentelemetry_tracing] +} + +main(...process.argv.slice(2)); diff --git a/samples/package.json b/samples/package.json index 1a668919d..80822afcc 100644 --- a/samples/package.json +++ b/samples/package.json @@ -14,6 +14,8 @@ "test": "mocha system-test --timeout 600000" }, "dependencies": { + "@opentelemetry/api": "^0.10.2", + "@opentelemetry/tracing": "^0.10.2", "@google-cloud/pubsub": "^2.4.0" }, "devDependencies": { diff --git a/src/opentelemetry-tracing.ts b/src/opentelemetry-tracing.ts new file mode 100644 index 000000000..3c51dd440 --- /dev/null +++ b/src/opentelemetry-tracing.ts @@ -0,0 +1,43 @@ +/*! + * Copyright 2020 Google LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import {Attributes, SpanContext, Span, trace} from '@opentelemetry/api'; +import {Tracer} from '@opentelemetry/tracing'; + +/** + * Wrapper for creating OpenTelemetry Spans + * + * @class + */ +export class OpenTelemetryTracer { + /** + * Creates a new span with the given properties + * + * @param {string} spanName the name for the span + * @param {Attributes?} attributes an object containing the attributes to be set for the span + * @param {SpanContext?} parent the context of the parent span to link to the span + */ + createSpan( + spanName: string, + attributes?: Attributes, + parent?: SpanContext + ): Span { + const tracerProvider: Tracer = trace.getTracer('default') as Tracer; + return tracerProvider.startSpan(spanName, { + parent: parent, + attributes: attributes, + }); + } +} diff --git a/src/publisher/index.ts b/src/publisher/index.ts index a5b44b874..0dc0099d3 100644 --- a/src/publisher/index.ts +++ b/src/publisher/index.ts @@ -17,6 +17,7 @@ import {promisify, promisifyAll} from '@google-cloud/promisify'; import * as extend from 'extend'; import {CallOptions} from 'google-gax'; +import {Span} from '@opentelemetry/api'; import {BatchPublishOptions} from './message-batch'; import {Queue, OrderedQueue} from './message-queues'; @@ -24,6 +25,7 @@ import {Topic} from '../topic'; import {RequestCallback, EmptyCallback} from '../pubsub'; import {google} from '../../protos/protos'; import {defaultOptions} from '../default-options'; +import {OpenTelemetryTracer} from '../opentelemetry-tracing'; export type PubsubMessage = google.pubsub.v1.IPubsubMessage; @@ -37,6 +39,7 @@ export interface PublishOptions { batching?: BatchPublishOptions; gaxOpts?: CallOptions; messageOrdering?: boolean; + enableOpenTelemetryTracing?: boolean; } /** @@ -72,11 +75,16 @@ export class Publisher { settings!: PublishOptions; queue: Queue; orderedQueues: Map; + tracing: OpenTelemetryTracer | undefined; constructor(topic: Topic, options?: PublishOptions) { this.setOptions(options); this.topic = topic; this.queue = new Queue(this); this.orderedQueues = new Map(); + this.tracing = + this.settings && this.settings.enableOpenTelemetryTracing + ? new OpenTelemetryTracer() + : undefined; } flush(): Promise; @@ -162,8 +170,13 @@ export class Publisher { } } + const span: Span | undefined = this.constructSpan(message); + if (!message.orderingKey) { this.queue.add(message, callback); + if (span) { + span.end(); + } return; } @@ -177,6 +190,10 @@ export class Publisher { const queue = this.orderedQueues.get(key)!; queue.add(message, callback); + + if (span) { + span.end(); + } } /** * Indicates to the publisher that it is safe to continue publishing for the @@ -211,13 +228,19 @@ export class Publisher { gaxOpts: { isBundling: false, }, + enableOpenTelemetryTracing: false, }; - const {batching, gaxOpts, messageOrdering} = extend( - true, - defaults, - options - ); + const { + batching, + gaxOpts, + messageOrdering, + enableOpenTelemetryTracing, + } = extend(true, defaults, options); + + this.tracing = enableOpenTelemetryTracing + ? new OpenTelemetryTracer() + : undefined; this.settings = { batching: { @@ -227,11 +250,45 @@ export class Publisher { }, gaxOpts, messageOrdering, + enableOpenTelemetryTracing, }; } + + /** + * Constructs an OpenTelemetry span + * + * @private + * + * @param {PubsubMessage} message The message to create a span for + */ + constructSpan(message: PubsubMessage): Span | undefined { + const spanAttributes = { + data: message.data, + }; + const span: Span | undefined = this.tracing + ? this.tracing.createSpan(`${this.topic.name} publisher`, spanAttributes) + : undefined; + if (span) { + if ( + message.attributes && + message.attributes['googclient_OpenTelemetrySpanContext'] + ) { + console.warn( + 'googclient_OpenTelemetrySpanContext key set as message attribute, but will be overridden.' + ); + } + if (!message.attributes) { + message.attributes = {}; + } + message.attributes[ + 'googclient_OpenTelemetrySpanContext' + ] = JSON.stringify(span.context()); + } + return span; + } } promisifyAll(Publisher, { singular: true, - exclude: ['publish', 'setOptions'], + exclude: ['publish', 'setOptions', 'constructSpan'], }); diff --git a/src/subscriber.ts b/src/subscriber.ts index eee532dc6..9b746636c 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -18,6 +18,7 @@ import {DateStruct, PreciseDate} from '@google-cloud/precise-date'; import {replaceProjectIdToken} from '@google-cloud/projectify'; import {promisify} from '@google-cloud/promisify'; import {EventEmitter} from 'events'; +import {SpanContext, Span} from '@opentelemetry/api'; import {google} from '../protos/protos'; import {Histogram} from './histogram'; @@ -27,6 +28,7 @@ import {MessageStream, MessageStreamOptions} from './message-stream'; import {Subscription} from './subscription'; import {defaultOptions} from './default-options'; import {SubscriberClient} from './v1'; +import {OpenTelemetryTracer} from './opentelemetry-tracing'; export type PullResponse = google.pubsub.v1.IPullResponse; @@ -202,6 +204,7 @@ export interface SubscriberOptions { batching?: BatchOptions; flowControl?: FlowControlOptions; streamingOptions?: MessageStreamOptions; + enableOpenTelemetryTracing?: boolean; } /** @@ -237,6 +240,7 @@ export class Subscriber extends EventEmitter { private _options!: SubscriberOptions; private _stream!: MessageStream; private _subscription: Subscription; + private _tracing: OpenTelemetryTracer | undefined; constructor(subscription: Subscription, options = {}) { super(); @@ -248,7 +252,6 @@ export class Subscriber extends EventEmitter { this._histogram = new Histogram({min: 10, max: 600}); this._latencies = new Histogram(); this._subscription = subscription; - this.setOptions(options); } /** @@ -423,7 +426,42 @@ export class Subscriber extends EventEmitter { this.maxMessages ); } + this._tracing = options.enableOpenTelemetryTracing + ? new OpenTelemetryTracer() + : undefined; + } + + /** + * Constructs an OpenTelemetry span from the incoming message. + * + * @param {Message} message One of the received messages + * @private + */ + private _constructSpan(message: Message): Span | undefined { + // Handle cases where OpenTelemetry is disabled or no span context was sent through message + if ( + !this._tracing || + !message.attributes || + !message.attributes['googclient_OpenTelemetrySpanContext'] + ) { + return undefined; + } + const spanValue = message.attributes['googclient_OpenTelemetrySpanContext']; + const parentSpanContext: SpanContext | undefined = spanValue + ? JSON.parse(spanValue) + : undefined; + const spanAttributes = { + ackId: message.ackId, + deliveryAttempt: message.deliveryAttempt, + }; + // Subscriber spans should always have a publisher span as a parent. + // Return undefined if no parent is provided + const span = parentSpanContext + ? this._tracing.createSpan(this._name, spanAttributes, parentSpanContext) + : undefined; + return span; } + /** * Callback to be invoked when a new message is available. * @@ -445,12 +483,16 @@ export class Subscriber extends EventEmitter { for (const data of receivedMessages!) { const message = new Message(this, data); + const span: Span | undefined = this._constructSpan(message); if (this.isOpen) { message.modAck(this.ackDeadline); this._inventory.add(message); } else { message.nack(); } + if (span) { + span.end(); + } } } diff --git a/test/opentelemetry-tracing.ts b/test/opentelemetry-tracing.ts new file mode 100644 index 000000000..032a23533 --- /dev/null +++ b/test/opentelemetry-tracing.ts @@ -0,0 +1,63 @@ +/*! + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import {describe, it, before, beforeEach, afterEach} from 'mocha'; + +import * as api from '@opentelemetry/api'; +import * as trace from '@opentelemetry/tracing'; +import {OpenTelemetryTracer} from '../src/opentelemetry-tracing'; +import {SimpleSpanProcessor} from '@opentelemetry/tracing'; + +describe('OpenTelemetryTracer', () => { + let tracing: OpenTelemetryTracer; + let span: trace.Span; + const spanName = 'test-span'; + const spanContext: api.SpanContext = { + traceId: 'd4cda95b652f4a1592b449d5929fda1b', + spanId: '6e0c63257de34c92', + traceFlags: api.TraceFlags.SAMPLED, + }; + const spanAttributes: api.Attributes = { + foo: 'bar', + }; + + before(() => { + const provider = new trace.BasicTracerProvider(); + const exporter = new trace.InMemorySpanExporter(); + provider.addSpanProcessor(new SimpleSpanProcessor(exporter)); + api.trace.setGlobalTracerProvider(provider); + }); + + beforeEach(() => { + tracing = new OpenTelemetryTracer(); + }); + + afterEach(() => { + span.end(); + }); + + it('creates a span', () => { + span = tracing.createSpan( + spanName, + spanAttributes, + spanContext + ) as trace.Span; + assert.strictEqual(span.name, spanName); + assert.deepStrictEqual(span.attributes, spanAttributes); + assert.strictEqual(span.parentSpanId, spanContext.spanId); + }); +}); diff --git a/test/publisher/index.ts b/test/publisher/index.ts index 910b1df27..8af07b479 100644 --- a/test/publisher/index.ts +++ b/test/publisher/index.ts @@ -20,6 +20,12 @@ import {describe, it, before, beforeEach, afterEach} from 'mocha'; import {EventEmitter} from 'events'; import * as proxyquire from 'proxyquire'; import * as sinon from 'sinon'; +import { + BasicTracerProvider, + InMemorySpanExporter, + SimpleSpanProcessor, +} from '@opentelemetry/tracing'; +import * as opentelemetry from '@opentelemetry/api'; import {Topic} from '../../src'; import * as p from '../../src/publisher'; @@ -36,7 +42,11 @@ const fakePromisify = Object.assign({}, pfy, { } promisified = true; assert.ok(options.singular); - assert.deepStrictEqual(options.exclude, ['publish', 'setOptions']); + assert.deepStrictEqual(options.exclude, [ + 'publish', + 'setOptions', + 'constructSpan', + ]); }, }); @@ -146,6 +156,58 @@ describe('Publisher', () => { }); }); + describe('OpenTelemetry tracing', () => { + let tracingPublisher: p.Publisher = {} as p.Publisher; + const enableTracing: p.PublishOptions = { + enableOpenTelemetryTracing: true, + }; + const disableTracing: p.PublishOptions = { + enableOpenTelemetryTracing: false, + }; + const buffer = Buffer.from('Hello, world!'); + + beforeEach(() => { + // Declare tracingPublisher as type any and pre-define _tracing + // to gain access to the private field after publisher init + tracingPublisher['tracing'] = undefined; + }); + it('should not instantiate a tracer when tracing is disabled', () => { + tracingPublisher = new Publisher(topic); + assert.strictEqual(tracingPublisher['tracing'], undefined); + }); + + it('should instantiate a tracer when tracing is enabled through constructor', () => { + tracingPublisher = new Publisher(topic, enableTracing); + assert.ok(tracingPublisher['tracing']); + }); + + it('should instantiate a tracer when tracing is enabled through setOptions', () => { + tracingPublisher = new Publisher(topic); + tracingPublisher.setOptions(enableTracing); + assert.ok(tracingPublisher['tracing']); + }); + + it('should disable tracing when tracing is disabled through setOptions', () => { + tracingPublisher = new Publisher(topic, enableTracing); + tracingPublisher.setOptions(disableTracing); + assert.strictEqual(tracingPublisher['tracing'], undefined); + }); + + it('export created spans', () => { + tracingPublisher = new Publisher(topic, enableTracing); + + // Setup trace exporting + const provider: BasicTracerProvider = new BasicTracerProvider(); + const exporter: InMemorySpanExporter = new InMemorySpanExporter(); + provider.addSpanProcessor(new SimpleSpanProcessor(exporter)); + provider.register(); + opentelemetry.trace.setGlobalTracerProvider(provider); + + tracingPublisher.publish(buffer); + assert.ok(exporter.getFinishedSpans()); + }); + }); + describe('publishMessage', () => { const data = Buffer.from('hello, world!'); const spy = sandbox.spy(); @@ -266,6 +328,20 @@ describe('Publisher', () => { done(); }); }); + + it('should issue a warning if OpenTelemetry span context key is set', () => { + const warnSpy = sinon.spy(console, 'warn'); + const attributes = { + googclient_OpenTelemetrySpanContext: 'foobar', + }; + const fakeMessageWithOTKey = {data, attributes}; + const publisherTracing = new Publisher(topic, { + enableOpenTelemetryTracing: true, + }); + publisherTracing.publishMessage(fakeMessageWithOTKey, warnSpy); + assert.ok(warnSpy.called); + warnSpy.restore(); + }); }); }); @@ -299,6 +375,7 @@ describe('Publisher', () => { gaxOpts: { isBundling: false, }, + enableOpenTelemetryTracing: false, }); }); @@ -313,6 +390,7 @@ describe('Publisher', () => { gaxOpts: { isBundling: true, }, + enableOpenTelemetryTracing: true, }; publisher.setOptions(options); diff --git a/test/subscriber.ts b/test/subscriber.ts index 7b69a8d2a..82064954f 100644 --- a/test/subscriber.ts +++ b/test/subscriber.ts @@ -22,6 +22,12 @@ import * as proxyquire from 'proxyquire'; import * as sinon from 'sinon'; import {PassThrough} from 'stream'; import * as uuid from 'uuid'; +import { + SimpleSpanProcessor, + BasicTracerProvider, + InMemorySpanExporter, +} from '@opentelemetry/tracing'; +import * as opentelemetry from '@opentelemetry/api'; import {HistogramOptions} from '../src/histogram'; import {FlowControlOptions} from '../src/lease-manager'; @@ -631,6 +637,103 @@ describe('Subscriber', () => { }); }); + describe('OpenTelemetry tracing', () => { + let tracingSubscriber: s.Subscriber = {} as s.Subscriber; + const enableTracing: s.SubscriberOptions = { + enableOpenTelemetryTracing: true, + }; + const disableTracing: s.SubscriberOptions = { + enableOpenTelemetryTracing: false, + }; + + beforeEach(() => { + // Pre-define _tracing to gain access to the private field after subscriber init + tracingSubscriber['_tracing'] = undefined; + }); + + it('should not instantiate a tracer when tracing is disabled', () => { + tracingSubscriber = new Subscriber(subscription); + assert.strictEqual(tracingSubscriber['_tracing'], undefined); + }); + + it('should instantiate a tracer when tracing is enabled through constructor', () => { + tracingSubscriber = new Subscriber(subscription, enableTracing); + assert.ok(tracingSubscriber['_tracing']); + }); + + it('should instantiate a tracer when tracing is enabled through setOptions', () => { + tracingSubscriber = new Subscriber(subscription); + tracingSubscriber.setOptions(enableTracing); + assert.ok(tracingSubscriber['_tracing']); + }); + + it('should disable tracing when tracing is disabled through setOptions', () => { + tracingSubscriber = new Subscriber(subscription, enableTracing); + tracingSubscriber.setOptions(disableTracing); + assert.strictEqual(tracingSubscriber['_tracing'], undefined); + }); + + it('exports a span once it is created', () => { + tracingSubscriber = new Subscriber(subscription, enableTracing); + + // Setup trace exporting + const provider: BasicTracerProvider = new BasicTracerProvider(); + const exporter: InMemorySpanExporter = new InMemorySpanExporter(); + provider.addSpanProcessor(new SimpleSpanProcessor(exporter)); + provider.register(); + opentelemetry.trace.setGlobalTracerProvider(provider); + + // Construct mock of received message with span context + const parentSpanContext: opentelemetry.SpanContext = { + traceId: 'd4cda95b652f4a1592b449d5929fda1b', + spanId: '6e0c63257de34c92', + traceFlags: opentelemetry.TraceFlags.SAMPLED, + }; + const messageWithSpanContext = { + ackId: uuid.v4(), + message: { + attributes: { + googclient_OpenTelemetrySpanContext: JSON.stringify( + parentSpanContext + ), + }, + data: Buffer.from('Hello, world!'), + messageId: uuid.v4(), + orderingKey: 'ordering-key', + publishTime: {seconds: 12, nanos: 32}, + }, + }; + const pullResponse: s.PullResponse = { + receivedMessages: [messageWithSpanContext], + }; + + // Receive message and assert that it was exported + const stream: FakeMessageStream = stubs.get('messageStream'); + stream.emit('data', pullResponse); + assert.ok(exporter.getFinishedSpans()); + }); + + it('does not export a span when a span context is not present on message', () => { + tracingSubscriber = new Subscriber(subscription, enableTracing); + + // Setup trace exporting + const provider: BasicTracerProvider = new BasicTracerProvider(); + const exporter: InMemorySpanExporter = new InMemorySpanExporter(); + provider.addSpanProcessor(new SimpleSpanProcessor(exporter)); + provider.register(); + opentelemetry.trace.setGlobalTracerProvider(provider); + + const pullResponse: s.PullResponse = { + receivedMessages: [RECEIVED_MESSAGE], + }; + + // Receive message and assert that it was exported + const stream: FakeMessageStream = stubs.get('messageStream'); + stream.emit('data', pullResponse); + assert.strictEqual(exporter.getFinishedSpans().length, 0); + }); + }); + describe('Message', () => { describe('initialization', () => { it('should localize ackId', () => {