Skip to content

Commit

Permalink
feat: Opentelemetry integration (#1078)
Browse files Browse the repository at this point in the history
* Add opentelemetry tracing

* build: rename _toc to toc (#1066)

* changes without context

        autosynth cannot find the source of changes triggered by earlier changes in this
        repository, or by version upgrades to tools such as linters.

* fix: rename _toc to toc

Source-Author: F. Hinkelmann <franziska.hinkelmann@gmail.com>
Source-Date: Tue Jul 21 10:53:20 2020 -0400
Source-Repo: googleapis/synthtool
Source-Sha: 99c93fe09f8c1dca09dfc0301c8668e3a70dd796
Source-Link: googleapis/synthtool@99c93fe

Co-authored-by: sofisl <55454395+sofisl@users.noreply.github.com>

* build: move gitattributes files to node templates (#1070)

Source-Author: F. Hinkelmann <franziska.hinkelmann@gmail.com>
Source-Date: Thu Jul 23 01:45:04 2020 -0400
Source-Repo: googleapis/synthtool
Source-Sha: 3a00b7fea8c4c83eaff8eb207f530a2e3e8e1de3
Source-Link: googleapis/synthtool@3a00b7f

* Add opentelemetry instrumentation

* Add create span test

* Refactor tracing

* Add publisher key test

* Fix linting issues

* Add docs

* Add example for opentelemetry

* Add tracing example

* Update headers

* Add microsoft api documenter

* Fix linting in samples/package.json

* Add optional tracing

* Fix linting issues

* Re-add api-documenter

* Update package.json

* Update package.json

* Update package.json

* Fix docs

* Add more unit tests

* Fix linting

* Add disable tracing tests

* Update opentelemetryTracing sample

Co-authored-by: Yoshi Automation Bot <yoshi-automation@google.com>
Co-authored-by: sofisl <55454395+sofisl@users.noreply.github.com>
Co-authored-by: Benjamin E. Coe <bencoe@google.com>
Co-authored-by: Megan Potter <57276408+feywind@users.noreply.github.com>
  • Loading branch information
5 people authored Aug 14, 2020
1 parent 80e0ee3 commit 76db007
Show file tree
Hide file tree
Showing 9 changed files with 509 additions and 11 deletions.
8 changes: 5 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"presystem-test": "npm run compile",
"system-test": "mocha build/system-test --timeout 600000",
"samples-test": "cd samples/ && npm link ../ && npm install && npm test && cd ../",
"test": "c8 mocha build/test",
"test": "c8 mocha build/test --recursive",
"lint": "gts check",
"predocs": "npm run compile",
"docs": "jsdoc -c .jsdoc.js",
Expand All @@ -44,15 +44,17 @@
"predocs-test": "npm run docs",
"benchwrapper": "node bin/benchwrapper.js",
"prelint": "cd samples; npm link ../; npm install",
"precompile": "gts clean",
"api-extractor": "api-extractor run --local",
"api-documenter": "api-documenter yaml --input-folder=temp"
"api-documenter": "api-documenter yaml --input-folder=temp",
"precompile": "gts clean"
},
"dependencies": {
"@google-cloud/paginator": "^3.0.0",
"@google-cloud/precise-date": "^2.0.0",
"@google-cloud/projectify": "^2.0.0",
"@google-cloud/promisify": "^2.0.0",
"@opentelemetry/api": "^0.9.0",
"@opentelemetry/tracing": "^0.9.0",
"@types/duplexify": "^3.6.0",
"@types/long": "^4.0.0",
"arrify": "^2.0.0",
Expand Down
108 changes: 108 additions & 0 deletions samples/opentelemetryTracing.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*!
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* This sample demonstrates how to add OpenTelemetry tracing to the
* Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

'use strict';

// sample-metadata:
// title: OpenTelemetry Tracing
// description: Demonstrates how to enable OpenTelemetry tracing in
// a publisher or subscriber.
// usage: node opentelemetryTracing.js <topic-name> <subscription-name>

const SUBSCRIBER_TIMEOUT = 10;

function main(
topicName = 'YOUR_TOPIC_NAME',
subscriptionName = 'YOUR_SUBSCRIPTION_NAME',
data = {foo: 'bar'}
) {
// [START opentelemetry_tracing]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicName = 'my-topic';
// const subscriptionName = 'my-subscription';
// const data = 'Hello, world!";

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Imports the OpenTelemetry API
const {opentelemetry} = require('@opentelemetry/api');

// Imports the OpenTelemetry span handlers and exporter
const {
SimpleSpanProcessor,
BasicTracerProvider,
ConsoleSpanExporter,
} = require('@opentelemetry/tracing');

// Set up span processing and specify the console as the span exporter
const provider = new BasicTracerProvider();
const exporter = new ConsoleSpanExporter();
provider.addSpanProcessor(new SimpleSpanProcessor(exporter));

provider.register();
opentelemetry.trace.setGlobalTracerProvider(provider);

// OpenTelemetry tracing is an optional feature and can be enabled by setting
// enableOpenTelemetryTraceing as a publisher or subscriber option
const enableOpenTelemetryTracing = {
enableOpenTelemetryTracing: true,
};

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishMessage() {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

const messageId = await pubSubClient
.topic(topicName, enableOpenTelemetryTracing)
.publish(dataBuffer);
console.log(`Message ${messageId} published.`);
}

async function subscriptionListen() {
// Message handler for subscriber
const messageHandler = message => {
console.log(`Message ${message.id} received.`);
message.ack();
};

// Listens for new messages from the topic
pubSubClient.subscription(subscriptionName).on('message', messageHandler);
setTimeout(() => {
pubSubClient
.subscription(subscriptionName, enableOpenTelemetryTracing)
.removeAllListeners();
}, SUBSCRIBER_TIMEOUT * 1000);
}

publishMessage().then(subscriptionListen());
// [END opentelemetry_tracing]
}

main(...process.argv.slice(2));
2 changes: 2 additions & 0 deletions samples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
"test": "mocha system-test --timeout 600000"
},
"dependencies": {
"@opentelemetry/api": "^0.10.2",
"@opentelemetry/tracing": "^0.10.2",
"@google-cloud/pubsub": "^2.4.0"
},
"devDependencies": {
Expand Down
43 changes: 43 additions & 0 deletions src/opentelemetry-tracing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*!
* Copyright 2020 Google LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {Attributes, SpanContext, Span, trace} from '@opentelemetry/api';
import {Tracer} from '@opentelemetry/tracing';

/**
* Wrapper for creating OpenTelemetry Spans
*
* @class
*/
export class OpenTelemetryTracer {
/**
* Creates a new span with the given properties
*
* @param {string} spanName the name for the span
* @param {Attributes?} attributes an object containing the attributes to be set for the span
* @param {SpanContext?} parent the context of the parent span to link to the span
*/
createSpan(
spanName: string,
attributes?: Attributes,
parent?: SpanContext
): Span {
const tracerProvider: Tracer = trace.getTracer('default') as Tracer;
return tracerProvider.startSpan(spanName, {
parent: parent,
attributes: attributes,
});
}
}
69 changes: 63 additions & 6 deletions src/publisher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
import {promisify, promisifyAll} from '@google-cloud/promisify';
import * as extend from 'extend';
import {CallOptions} from 'google-gax';
import {Span} from '@opentelemetry/api';

import {BatchPublishOptions} from './message-batch';
import {Queue, OrderedQueue} from './message-queues';
import {Topic} from '../topic';
import {RequestCallback, EmptyCallback} from '../pubsub';
import {google} from '../../protos/protos';
import {defaultOptions} from '../default-options';
import {OpenTelemetryTracer} from '../opentelemetry-tracing';

export type PubsubMessage = google.pubsub.v1.IPubsubMessage;

Expand All @@ -37,6 +39,7 @@ export interface PublishOptions {
batching?: BatchPublishOptions;
gaxOpts?: CallOptions;
messageOrdering?: boolean;
enableOpenTelemetryTracing?: boolean;
}

/**
Expand Down Expand Up @@ -72,11 +75,16 @@ export class Publisher {
settings!: PublishOptions;
queue: Queue;
orderedQueues: Map<string, OrderedQueue>;
tracing: OpenTelemetryTracer | undefined;
constructor(topic: Topic, options?: PublishOptions) {
this.setOptions(options);
this.topic = topic;
this.queue = new Queue(this);
this.orderedQueues = new Map();
this.tracing =
this.settings && this.settings.enableOpenTelemetryTracing
? new OpenTelemetryTracer()
: undefined;
}

flush(): Promise<void>;
Expand Down Expand Up @@ -162,8 +170,13 @@ export class Publisher {
}
}

const span: Span | undefined = this.constructSpan(message);

if (!message.orderingKey) {
this.queue.add(message, callback);
if (span) {
span.end();
}
return;
}

Expand All @@ -177,6 +190,10 @@ export class Publisher {

const queue = this.orderedQueues.get(key)!;
queue.add(message, callback);

if (span) {
span.end();
}
}
/**
* Indicates to the publisher that it is safe to continue publishing for the
Expand Down Expand Up @@ -211,13 +228,19 @@ export class Publisher {
gaxOpts: {
isBundling: false,
},
enableOpenTelemetryTracing: false,
};

const {batching, gaxOpts, messageOrdering} = extend(
true,
defaults,
options
);
const {
batching,
gaxOpts,
messageOrdering,
enableOpenTelemetryTracing,
} = extend(true, defaults, options);

this.tracing = enableOpenTelemetryTracing
? new OpenTelemetryTracer()
: undefined;

this.settings = {
batching: {
Expand All @@ -227,11 +250,45 @@ export class Publisher {
},
gaxOpts,
messageOrdering,
enableOpenTelemetryTracing,
};
}

/**
* Constructs an OpenTelemetry span
*
* @private
*
* @param {PubsubMessage} message The message to create a span for
*/
constructSpan(message: PubsubMessage): Span | undefined {
const spanAttributes = {
data: message.data,
};
const span: Span | undefined = this.tracing
? this.tracing.createSpan(`${this.topic.name} publisher`, spanAttributes)
: undefined;
if (span) {
if (
message.attributes &&
message.attributes['googclient_OpenTelemetrySpanContext']
) {
console.warn(
'googclient_OpenTelemetrySpanContext key set as message attribute, but will be overridden.'
);
}
if (!message.attributes) {
message.attributes = {};
}
message.attributes[
'googclient_OpenTelemetrySpanContext'
] = JSON.stringify(span.context());
}
return span;
}
}

promisifyAll(Publisher, {
singular: true,
exclude: ['publish', 'setOptions'],
exclude: ['publish', 'setOptions', 'constructSpan'],
});
Loading

0 comments on commit 76db007

Please sign in to comment.